Skip to content

Commit 7d87c8c

Browse files
committed
[FIX] changed the code to avoid some underlying bugs of FastFlow using A2As
1 parent 5e04e75 commit 7d87c8c

File tree

11 files changed

+37
-213
lines changed

11 files changed

+37
-213
lines changed

wf/accumulator.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
#include<unordered_map>
4040
#include<ff/node.hpp>
4141
#include<ff/pipeline.hpp>
42-
#include<ff/multinode.hpp>
4342
#include<ff/farm.hpp>
4443
#include<basic.hpp>
4544
#include<context.hpp>
@@ -82,7 +81,7 @@ class Accumulator: public ff::ff_farm, public Basic_Operator
8281
size_t parallelism; // internal parallelism of the Accumulator
8382
bool used; // true if the Accumulator has been added/chained in a MultiPipe
8483
// class Accumulator_Node
85-
class Accumulator_Node: public ff::ff_minode_t<tuple_t, result_t>
84+
class Accumulator_Node: public ff::ff_node_t<tuple_t, result_t>
8685
{
8786
private:
8887
acc_func_t acc_func; // reduce/fold function

wf/filter.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
#include<string>
3939
#include<ff/node.hpp>
4040
#include<ff/pipeline.hpp>
41-
#include<ff/multinode.hpp>
4241
#include<ff/farm.hpp>
4342
#include<basic.hpp>
4443
#include<context.hpp>
@@ -87,7 +86,7 @@ class Filter: public ff::ff_farm, public Basic_Operator
8786
bool keyed; // flag stating whether the Filter is configured with keyBy or not
8887
bool used; // true if the Filter has been added/chained in a MultiPipe
8988
// class Filter_Node
90-
class Filter_Node: public ff::ff_minode_t<tuple_t, result_t>
89+
class Filter_Node: public ff::ff_node_t<tuple_t, result_t>
9190
{
9291
private:
9392
filter_func_t filter_func; // filter function (with boolean return type)

wf/flatmap.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
#include<string>
3939
#include<ff/node.hpp>
4040
#include<ff/pipeline.hpp>
41-
#include<ff/multinode.hpp>
4241
#include<ff/farm.hpp>
4342
#include<basic.hpp>
4443
#include<shipper.hpp>
@@ -80,7 +79,7 @@ class FlatMap: public ff::ff_farm, public Basic_Operator
8079
bool keyed; // flag stating whether the FlatMap is configured with keyBy or not
8180
bool used; // true if the FlatMap has been added/chained in a MultiPipe
8281
// class FlatMap_Node
83-
class FlatMap_Node: public ff::ff_minode_t<tuple_t, result_t>
82+
class FlatMap_Node: public ff::ff_node_t<tuple_t, result_t>
8483
{
8584
private:
8685
flatmap_func_t flatmap_func; // flatmap function

wf/map.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
#include<string>
3939
#include<ff/node.hpp>
4040
#include<ff/pipeline.hpp>
41-
#include<ff/multinode.hpp>
4241
#include<ff/farm.hpp>
4342
#include<basic.hpp>
4443
#include<context.hpp>
@@ -83,7 +82,7 @@ class Map: public ff::ff_farm, public Basic_Operator
8382
bool keyed; // flag stating whether the Map is configured with keyBy or not
8483
bool used; // true if the Map has been added/chained in a MultiPipe
8584
// class Map_Node
86-
class Map_Node: public ff::ff_minode_t<tuple_t, result_t>
85+
class Map_Node: public ff::ff_node_t<tuple_t, result_t>
8786
{
8887
private:
8988
map_func_ip_t func_ip; // in-place map function

wf/multipipe.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,10 @@ class MultiPipe: public ff::ff_pipeline
268268
collector_t *collector = new collector_t(_ordering, atomic_num_dropped);
269269
combine_with_firststage(*stage, collector, true); // add the ordering_node / kslack_node
270270
}
271+
else {
272+
dummy_mi *collector = new dummy_mi();
273+
combine_with_firststage(*stage, collector, true); // dummy multi-input node
274+
}
271275
first_set.push_back(stage);
272276
}
273277
matrioska->add_firstset(first_set, 0, true);
@@ -318,6 +322,10 @@ class MultiPipe: public ff::ff_pipeline
318322
collector_t *collector = new collector_t(_ordering, atomic_num_dropped);
319323
combine_with_firststage(*stage, collector, true); // add the ordering_node / kslack_node
320324
}
325+
else {
326+
dummy_mi *collector = new dummy_mi();
327+
combine_with_firststage(*stage, collector, true); // dummy multi-input node
328+
}
321329
first_set.push_back(stage);
322330
}
323331
matrioska->add_firstset(first_set, 0, true);

wf/sink.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
#include<ff/node.hpp>
4646
#include<ff/combine.hpp>
4747
#include<ff/pipeline.hpp>
48-
#include<ff/multinode.hpp>
4948
#include<ff/farm.hpp>
5049
#include<basic.hpp>
5150
#include<context.hpp>
@@ -90,7 +89,7 @@ class Sink: public ff::ff_farm, public Basic_Operator
9089
bool keyed; // flag stating whether the Sink is configured with keyBy or not
9190
bool used; // true if the Sink has been added/chained in a MultiPipe
9291
// class Sink_Node
93-
class Sink_Node: public ff::ff_minode_t<tuple_t>
92+
class Sink_Node: public ff::ff_node_t<tuple_t>
9493
{
9594
private:
9695
sink_func_t sink_func; // sink function (receiving a reference to an optional containing the input)

wf/test_mp_gpu_kff_cb.cpp

Lines changed: 0 additions & 175 deletions
This file was deleted.

wf/win_seq.hpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
#include<unordered_map>
4242
#include<math.h>
4343
#include<ff/node.hpp>
44-
#include<ff/multinode.hpp>
4544
#include<meta.hpp>
4645
#include<window.hpp>
4746
#include<context.hpp>
@@ -55,7 +54,7 @@ namespace wf {
5554

5655
// Win_Seq class
5756
template<typename tuple_t, typename result_t, typename input_t>
58-
class Win_Seq: public ff::ff_minode_t<input_t, result_t>
57+
class Win_Seq: public ff::ff_node_t<input_t, result_t>
5958
{
6059
public:
6160
// type of the non-incremental window processing function
@@ -485,9 +484,9 @@ class Win_Seq: public ff::ff_minode_t<input_t, result_t>
485484
{
486485
eos_received++;
487486
// check the number of received EOS messages
488-
if ((eos_received != this->get_num_inchannels()) && (this->get_num_inchannels() != 0)) { // workaround due to FastFlow
489-
return;
490-
}
487+
//if ((eos_received != this->get_num_inchannels()) && (this->get_num_inchannels() != 0)) { // workaround due to FastFlow
488+
// return;
489+
//}
491490
// iterate over all the keys
492491
for (auto &k: keyMap) {
493492
auto &wins = (k.second).wins;
@@ -568,13 +567,13 @@ class Win_Seq: public ff::ff_minode_t<input_t, result_t>
568567
// method to start the node execution asynchronously
569568
int run(bool) override
570569
{
571-
return ff::ff_minode::run();
570+
return ff::ff_node::run();
572571
}
573572

574573
// method to wait the node termination
575574
int wait() override
576575
{
577-
return ff::ff_minode::wait();
576+
return ff::ff_node::wait();
578577
}
579578
};
580579

wf/win_seq_gpu.hpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
#include<unordered_map>
4343
#include<math.h>
4444
#include<ff/node.hpp>
45-
#include<ff/multinode.hpp>
4645
#include<meta.hpp>
4746
#include<window.hpp>
4847
#include<meta_gpu.hpp>
@@ -82,7 +81,7 @@ __global__ void ComputeBatch_Kernel(void *input_data,
8281

8382
// Win_Seq_GPU class
8483
template<typename tuple_t, typename result_t, typename win_F_t, typename input_t>
85-
class Win_Seq_GPU: public ff::ff_minode_t<input_t, result_t>
84+
class Win_Seq_GPU: public ff::ff_node_t<input_t, result_t>
8685
{
8786
private:
8887
// type of the stream archive used by the Win_Seq_GPU node
@@ -614,9 +613,9 @@ class Win_Seq_GPU: public ff::ff_minode_t<input_t, result_t>
614613
{
615614
eos_received++;
616615
// check the number of received EOS messages
617-
if ((eos_received != this->get_num_inchannels()) && (this->get_num_inchannels() != 0)) { // workaround due to FastFlow
618-
return;
619-
}
616+
//if ((eos_received != this->get_num_inchannels()) && (this->get_num_inchannels() != 0)) { // workaround due to FastFlow
617+
// return;
618+
//}
620619
// emit results of the previously running kernel on the GPU
621620
waitAndFlush();
622621
// allocate on the CPU the scratchpad_memory
@@ -709,13 +708,13 @@ class Win_Seq_GPU: public ff::ff_minode_t<input_t, result_t>
709708
// method to start the node execution asynchronously
710709
int run(bool) override
711710
{
712-
return ff::ff_minode::run();
711+
return ff::ff_node::run();
713712
}
714713

715714
// method to wait the node termination
716715
int wait() override
717716
{
718-
return ff::ff_minode::wait();
717+
return ff::ff_node::wait();
719718
}
720719
};
721720

0 commit comments

Comments
 (0)