Skip to content

Commit fa0c360

Browse files
committed
[FIX] Fixed some bugs in the Reduce operator
1 parent 037e602 commit fa0c360

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

wf/reduce.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ class Reduce_Replica: public ff::ff_monode
5353
private:
5454
reduce_func_t func; // functional logic used by the Reduce replica
5555
key_extractor_func_t key_extr; // logic to extract the key attribute from the tuple_t
56-
using tuple_t = decltype(get_tuple_t_Acc(func)); // extracting the tuple_t type and checking the admissible signatures
57-
using state_t = decltype(get_state_t_Acc(func)); // extracting the state_t type and checking the admissible signatures
56+
using tuple_t = decltype(get_tuple_t_Reduce(func)); // extracting the tuple_t type and checking the admissible signatures
57+
using state_t = decltype(get_state_t_Reduce(func)); // extracting the state_t type and checking the admissible signatures
5858
using key_t = decltype(get_key_t_KeyExtr(key_extr)); // extracting the key_t type and checking the admissible singatures
5959
// static predicates to check the type of the functional logic to be invoked
6060
static constexpr bool isNonRiched = std::is_invocable<decltype(func), const tuple_t &, state_t &>::value;
@@ -220,7 +220,7 @@ class Reduce_Replica: public ff::ff_monode
220220
}
221221
#endif
222222
if (input_batching) { // receiving a batch
223-
Batch_t<decltype(get_tuple_t_Acc(func))> *batch_input = reinterpret_cast<Batch_t<decltype(get_tuple_t_Acc(func))> *>(_in);
223+
Batch_t<decltype(get_tuple_t_Reduce(func))> *batch_input = reinterpret_cast<Batch_t<decltype(get_tuple_t_Reduce(func))> *>(_in);
224224
if (batch_input->isPunct()) { // if it is a punctuaton
225225
emitter->generate_punctuation(batch_input->getWatermark(context.getReplicaIndex()), this); // propagate the received punctuation
226226
deleteBatch_t(batch_input); // delete the punctuaton
@@ -238,7 +238,7 @@ class Reduce_Replica: public ff::ff_monode
238238
deleteBatch_t(batch_input); // delete the input batch
239239
}
240240
else { // receiving a single input
241-
Single_t<decltype(get_tuple_t_Acc(func))> *input = reinterpret_cast<Single_t<decltype(get_tuple_t_Acc(func))> *>(_in);
241+
Single_t<decltype(get_tuple_t_Reduce(func))> *input = reinterpret_cast<Single_t<decltype(get_tuple_t_Reduce(func))> *>(_in);
242242
if (input->isPunct()) { // if it is a punctuaton
243243
emitter->generate_punctuation(input->getWatermark(context.getReplicaIndex()), this); // propagate the received punctuation
244244
deleteSingle_t(input); // delete the punctuaton
@@ -285,7 +285,7 @@ class Reduce_Replica: public ff::ff_monode
285285
context.setContextParameters(_timestamp, _watermark); // set the parameter of the RuntimeContext
286286
func(_tuple, (*it).second, context);
287287
}
288-
decltype(get_state_t_Acc(func)) result = (*it).second; // the result is a copy of the present state
288+
decltype(get_state_t_Reduce(func)) result = (*it).second; // the result is a copy of the present state
289289
emitter->emit(&result, 0, _timestamp, _watermark, this);
290290
}
291291

@@ -356,7 +356,7 @@ class Reduce: public Basic_Operator
356356
friend class PipeGraph; // friendship with the PipeGraph class
357357
reduce_func_t func; // functional logic used by the Reduce
358358
key_extractor_func_t key_extr; // logic to extract the key attribute from the tuple_t
359-
using state_t = decltype(get_state_t_Acc(func)); // extracting the state_t type and checking the admissible signatures
359+
using state_t = decltype(get_state_t_Reduce(func)); // extracting the state_t type and checking the admissible signatures
360360
size_t parallelism; // parallelism of the Reduce
361361
std::string name; // name of the Reduce
362362
bool input_batching; // if true, the Reduce expects to receive batches instead of individual inputs

0 commit comments

Comments
 (0)