Skip to content

Commit eb9391b

Browse files
committed
[FIX] Refactoring of some names of methods
1 parent 7de202e commit eb9391b

File tree

6 files changed

+174
-12
lines changed

6 files changed

+174
-12
lines changed

tests/graph_tests/test_simple.cpp

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/**************************************************************************************
2+
* Copyright (c) 2019- Gabriele Mencagli
3+
*
4+
* This file is part of WindFlow.
5+
*
6+
* WindFlow is free software dual licensed under the GNU LGPL or MIT License.
7+
* You can redistribute it and/or modify it under the terms of the
8+
* * GNU Lesser General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version
11+
* OR
12+
* * MIT License: https://github.com/ParaGroup/WindFlow/blob/master/LICENSE.MIT
13+
*
14+
* WindFlow is distributed in the hope that it will be useful,
15+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
16+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17+
* GNU Lesser General Public License for more details.
18+
* You should have received a copy of the GNU Lesser General Public License and
19+
* the MIT License along with WindFlow. If not, see <http://www.gnu.org/licenses/>
20+
* and <http://opensource.org/licenses/MIT/>.
21+
**************************************************************************************
22+
*/
23+
24+
// include
25+
#include<random>
26+
#include<iostream>
27+
#include<windflow.hpp>
28+
29+
using namespace std;
30+
using namespace wf;
31+
32+
// global variable for the result
33+
atomic<long> sent_tuples;
34+
35+
// Struct of the input tuple
36+
struct tuple_t
37+
{
38+
int64_t key;
39+
int64_t value;
40+
};
41+
42+
// Source functor for generating positive numbers
43+
class Source_Functor
44+
{
45+
private:
46+
uint64_t next_ts; // next timestamp
47+
unsigned long app_start_time;
48+
unsigned long current_time;
49+
long generated_tuples;
50+
size_t batch_size;
51+
52+
public:
53+
// Constructor
54+
Source_Functor(const unsigned long _app_start_time,
55+
size_t _batch_size):
56+
next_ts(0),
57+
batch_size(_batch_size),
58+
app_start_time(_app_start_time),
59+
current_time(_app_start_time),
60+
generated_tuples(0) {}
61+
62+
// operator()
63+
void operator()(Source_Shipper<tuple_t> &shipper)
64+
{
65+
// static thread_local std::mt19937 generator;
66+
// std::uniform_int_distribution<int> distribution(0, 500);
67+
current_time = current_time_nsecs(); // get the current time
68+
while (current_time - app_start_time <= 60e9) { // generation loop
69+
tuple_t t;
70+
t.key = 10;
71+
t.value = 10;
72+
shipper.pushWithTimestamp(std::move(t), next_ts);
73+
shipper.setNextWatermark(next_ts);
74+
auto offset = 250; // (distribution(generator)+1);
75+
next_ts += offset;
76+
generated_tuples++;
77+
if ((batch_size > 0) && (generated_tuples % (100000 * batch_size) == 0)) {
78+
current_time = current_time_nsecs(); // get the new current time
79+
}
80+
if (batch_size == 0) {
81+
current_time = current_time_nsecs(); // get the new current time
82+
}
83+
}
84+
sent_tuples.fetch_add(generated_tuples); // save the number of generated tuples
85+
}
86+
};
87+
88+
// Sink functor
89+
class Sink_Functor
90+
{
91+
private:
92+
size_t received; // counter of received results
93+
long totalsum;
94+
95+
public:
96+
// Constructor
97+
Sink_Functor():
98+
received(0),
99+
totalsum(0) {}
100+
101+
// operator()
102+
void operator()(optional<reference_wrapper<tuple_t>> out, RuntimeContext &rc)
103+
{
104+
if (out) {
105+
received++;
106+
totalsum += ((*out).get()).value;
107+
}
108+
}
109+
};
110+
111+
// main
112+
int main(int argc, char *argv[])
113+
{
114+
int option = 0;
115+
// initalize global variable
116+
sent_tuples = 0;
117+
size_t pardegree = 1;
118+
size_t batch_size = 0;
119+
// arguments from command line
120+
if (argc != 5) {
121+
cout << argv[0] << " -n [par] -b [batch_size]" << endl;
122+
exit(EXIT_SUCCESS);
123+
}
124+
while ((option = getopt(argc, argv, "n:b:")) != -1) {
125+
switch (option) {
126+
case 'n': pardegree = atoi(optarg);
127+
break;
128+
case 'b': batch_size = atoi(optarg);
129+
break;
130+
default: {
131+
cout << argv[0] << " -n [par] -b [batch_size]" << endl;
132+
exit(EXIT_SUCCESS);
133+
}
134+
}
135+
}
136+
// prepare the test
137+
PipeGraph graph("test_simple", Execution_Mode_t::DEFAULT, Time_Policy_t::EVENT_TIME);
138+
// application starting time
139+
unsigned long app_start_time = current_time_nsecs();
140+
// prepare the first MultiPipe
141+
Source_Functor source_functor(app_start_time, batch_size);
142+
Source source = Source_Builder(source_functor)
143+
.withName("source1")
144+
.withParallelism(pardegree)
145+
.withOutputBatchSize(batch_size)
146+
.build();
147+
MultiPipe &pipe = graph.add_source(source);
148+
Sink_Functor sink_functor;
149+
Sink sink = Sink_Builder(sink_functor)
150+
.withName("sink")
151+
.withParallelism(pardegree)
152+
.build();
153+
pipe.chain_sink(sink);
154+
/// evaluate topology execution time
155+
volatile unsigned long start_time_main_usecs = current_time_usecs();
156+
graph.run();
157+
volatile unsigned long end_time_main_usecs = current_time_usecs();
158+
double elapsed_time_seconds = (end_time_main_usecs - start_time_main_usecs) / (1000000.0);
159+
double throughput = sent_tuples / elapsed_time_seconds;
160+
cout << "Measured throughput: " << (int) throughput << " tuples/second" << endl;
161+
return 0;
162+
}

tests/join_tests/test_join_3.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ int main(int argc, char *argv[])
183183
.withOutputBatchSize(dist_b(rng))
184184
.withKeyBy([](const tuple_t &t) -> size_t { return t.key; })
185185
.withBoundaries(milliseconds(lower_bound), milliseconds(upper_bound))
186-
.withDPSMode()
186+
.withDPMode()
187187
.build();
188188
pipe3.add(join);
189189
// prepare the fourth MultiPipe
@@ -309,7 +309,7 @@ int main(int argc, char *argv[])
309309
.withParallelism(join_degree)
310310
.withKeyBy([](const tuple_t &t) -> size_t { return t.key; })
311311
.withBoundaries(milliseconds(lower_bound), milliseconds(upper_bound))
312-
.withDPSMode()
312+
.withDPMode()
313313
.build();
314314
pipe3.add(join);
315315
// prepare the fourth MultiPipe

tests/join_tests/test_join_4.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ int main(int argc, char *argv[])
220220
.withOutputBatchSize(dist_b(rng))
221221
.withKeyBy([](const tuple_t &t) -> size_t { return t.key; })
222222
.withBoundaries(milliseconds(lower_bound), milliseconds(upper_bound))
223-
.withDPSMode()
223+
.withDPMode()
224224
.build();
225225
pipe4.add(join);
226226
Filter_Functor filter_functor3(6);
@@ -379,7 +379,7 @@ int main(int argc, char *argv[])
379379
.withParallelism(join_degree)
380380
.withKeyBy([](const tuple_t &t) -> size_t { return t.key; })
381381
.withBoundaries(milliseconds(lower_bound), milliseconds(upper_bound))
382-
.withDPSMode()
382+
.withDPMode()
383383
.build();
384384
pipe4.add(join);
385385
Filter_Functor filter_functor3(6);

tests/join_tests/test_join_5.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ int main(int argc, char *argv[])
369369
.withOutputBatchSize(dist_b(rng))
370370
.withKeyBy([](const tuple_t &t) -> size_t { return t.key; })
371371
.withBoundaries(milliseconds(lower_bound), milliseconds(upper_bound))
372-
.withDPSMode()
372+
.withDPMode()
373373
.build();
374374
pipe9.add(join2);
375375
Sink_Functor sink_functor;
@@ -631,7 +631,7 @@ int main(int argc, char *argv[])
631631
.withParallelism(join2_degree)
632632
.withKeyBy([](const tuple_t &t) -> size_t { return t.key; })
633633
.withBoundaries(milliseconds(lower_bound), milliseconds(upper_bound))
634-
.withDPSMode()
634+
.withDPMode()
635635
.build();
636636
pipe9.add(join2);
637637
Sink_Functor sink_functor;

wf/builders.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,14 +1517,14 @@ class Interval_Join_Builder: public Basic_Builder<Interval_Join_Builder, join_fu
15171517
*
15181518
* \return a reference to the builder object
15191519
*/
1520-
auto &withDPSMode()
1520+
auto &withDPMode()
15211521
{
15221522
if (!isKeyBySet) {
15231523
std::cerr << RED << "WindFlow Error: Interval_Join with data parallelism mode requires a key extractor" << DEFAULT_COLOR << std::endl;
15241524
exit(EXIT_FAILURE);
15251525
}
15261526
if (join_mode != Join_Mode_t::NONE) {
1527-
std::cerr << RED << "WindFlow Error: wrong use of withDPSMode() in the Interval_Join_Builder, you can specify only one mode per join operator " << DEFAULT_COLOR << std::endl;
1527+
std::cerr << RED << "WindFlow Error: wrong use of withDPMode() in the Interval_Join_Builder, you can specify only one mode per join operator " << DEFAULT_COLOR << std::endl;
15281528
exit(EXIT_FAILURE);
15291529
}
15301530
input_routing_mode = Routing_Mode_t::BROADCAST;

wf/interval_join.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,17 +296,17 @@ class IJoin_Replica: public Basic_Replica
296296
}
297297
Key_Descriptor &key_d = (*it).second;
298298
uint64_t l_b = 0;
299-
if (isStreamA(_tag)) {
299+
if (isStreamA(_tag)) { // base
300300
if (-lower_bound <= static_cast<int64_t>(_timestamp)) { l_b = _timestamp + lower_bound; }
301301
}
302-
else {
302+
else { // probe
303303
if (upper_bound <= static_cast<int64_t>(_timestamp)) { l_b = _timestamp - upper_bound; }
304304
}
305305
uint64_t u_b = 0;
306-
if (isStreamA(_tag)) {
306+
if (isStreamA(_tag)) { // base
307307
if (-upper_bound <= static_cast<int64_t>(_timestamp)) { u_b = _timestamp + upper_bound; }
308308
}
309-
else {
309+
else { // probe
310310
if (lower_bound <= static_cast<int64_t>(_timestamp)) { u_b = _timestamp - lower_bound; }
311311
}
312312
std::optional<result_t> output;

0 commit comments

Comments
 (0)