Skip to content

Commit bb8d302

Browse files
committed
[FIX] Fixing of persistent operators
1 parent a237dbe commit bb8d302

File tree

6 files changed

+30
-39
lines changed

6 files changed

+30
-39
lines changed

tests/rocksdb_tests/rocksdb_common.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ class WinSink_Functor
467467
if (out) {
468468
received++;
469469
totalsum += (*out).value;
470-
//std::cout << "Ricevuto risultato finestra wid: " << (*out).wid << ", chiave: " << (*out).key << ", valore: " << (*out).value << std::endl;
470+
std::cout << "Ricevuto risultato finestra wid: " << (*out).wid << ", chiave: " << (*out).key << ", valore: " << (*out).value << std::endl;
471471
}
472472
else {
473473
// printf("Received: %ld results, total sum: %ld\n", received, totalsum);

tests/rocksdb_tests/test_rocksdb_6.cpp

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,6 @@ int main(int argc, char *argv[])
107107
cout << "| | (" << source_degree << ") +-->+ (" << filter_degree << ") +-->+ (" << flatmap_degree << ") +-->+ (" << map_degree << ") +-->+ (" << kw_degree << ") +-->+ (" << sink_degree << ") | |" << endl;
108108
cout << "| +-----+ +-----+ +------+ +-----+ +--------+ +-----+ |" << endl;
109109
cout << "+-----------------------------------------------------------------+" << endl;
110-
auto tuple_serializer = [](tuple_t &t) -> std::string {
111-
return std::to_string(t.key) + "," + std::to_string(t.value);
112-
};
113-
auto tuple_deserializer = [](std::string &s) -> tuple_t {
114-
tuple_t t;
115-
t.key = atoi(s.substr(0, s.find(",")).c_str());
116-
t.value = atoi(s.substr(s.find(",")+1, s.length()-1).c_str());
117-
return t;
118-
};
119110
auto result_serializer = [](result_t &r) -> std::string {
120111
return std::to_string(r.key) + "," + std::to_string(r.value) + ";" + std::to_string(r.wid);
121112
};
@@ -160,7 +151,6 @@ int main(int argc, char *argv[])
160151
.withParallelism(kw_degree)
161152
.withKeyBy([](const tuple_t &t) -> size_t { return t.key; })
162153
.withCBWindows(win_len, win_slide)
163-
.withTupleSerializerAndDeserializer(tuple_serializer, tuple_deserializer)
164154
.withResultSerializerAndDeserializer(result_serializer, result_deserializer)
165155
.build();
166156
#else

tests/rocksdb_tests/test_rocksdb_8.cpp

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,6 @@ int main(int argc, char *argv[])
108108
cout << "| | (" << source_degree << ") +-->+ (" << filter_degree << ") +-->+ (" << flatmap_degree << ") +-->+ (" << map_degree << ") +-->+ (" << kw_degree << ") +-->+ (" << sink_degree << ") | |" << endl;
109109
cout << "| +-----+ +-----+ +------+ +-----+ +--------+ +-----+ |" << endl;
110110
cout << "+-----------------------------------------------------------------+" << endl;
111-
auto tuple_serializer = [](tuple_t &t) -> std::string {
112-
return std::to_string(t.key) + "," + std::to_string(t.value);
113-
};
114-
auto tuple_deserializer = [](std::string &s) -> tuple_t {
115-
tuple_t t;
116-
t.key = atoi(s.substr(0, s.find(",")).c_str());
117-
t.value = atoi(s.substr(s.find(",")+1, s.length()-1).c_str());
118-
return t;
119-
};
120111
auto result_serializer = [](result_t &r) -> std::string {
121112
return std::to_string(r.key) + "," + std::to_string(r.value) + ";" + std::to_string(r.wid);
122113
};
@@ -165,7 +156,6 @@ int main(int argc, char *argv[])
165156
.withParallelism(kw_degree)
166157
.withKeyBy([](const tuple_t &t) -> size_t { return t.key; })
167158
.withTBWindows(microseconds(win_len), microseconds(win_slide))
168-
.withTupleSerializerAndDeserializer(tuple_serializer, tuple_deserializer)
169159
.withResultSerializerAndDeserializer(result_serializer, result_deserializer)
170160
.build();
171161
#else

wf/persistent/builders_rocksdb.hpp

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1273,7 +1273,7 @@ class P_Keyed_Windows_Builder: public P_Basic_Builder<P_Keyed_Windows_Builder, w
12731273
keyextr_func_t key_extr = [](const tuple_t &t) -> key_t { return key_t(); }; // key extractor
12741274
bool isKeyBySet = false; // true if a key extractor has been provided
12751275
size_t frag_bytes = sizeof(tuple_t) * 16; // size in bytes of each archive fragment of the stream
1276-
bool results_in_memory = true; // flag stating if results must be kepts on memory or on RocksDB
1276+
bool results_in_memory = true; // flag stating if results must be kepts in memory or on RocksDB
12771277
bool isTupleFunctions = false; // flag stating if the tuple serializer/deserializer have been provided
12781278
bool isResultFunctions = false; // flag stating if the result serializer/deserializer have been provided
12791279
uint64_t win_len=0; // window length in number of tuples or in time units
@@ -1428,6 +1428,7 @@ class P_Keyed_Windows_Builder: public P_Basic_Builder<P_Keyed_Windows_Builder, w
14281428
result_serialize = _result_serialize;
14291429
result_deserialize = _result_deserialize;
14301430
isResultFunctions = true;
1431+
results_in_memory = false;
14311432
return *this;
14321433
}
14331434

@@ -1452,6 +1453,25 @@ class P_Keyed_Windows_Builder: public P_Basic_Builder<P_Keyed_Windows_Builder, w
14521453
std::cerr << RED << "WindFlow Error: P_Keyed_Windows with paralellism > 1 requires a key extractor" << DEFAULT_COLOR << std::endl;
14531454
exit(EXIT_FAILURE);
14541455
}
1456+
// check the presence of the tuple/result serializer/deserializer according to type of the window processing logic
1457+
if constexpr (std::is_invocable<decltype(func), const Iterable<tuple_t> &, result_t &>::value ||
1458+
std::is_invocable<decltype(func), const Iterable<tuple_t> &, result_t &, RuntimeContext &>::value) {
1459+
if (!isTupleFunctions) {
1460+
std::cerr << RED << "WindFlow Error: P_Keyed_Windows instantiated with a non-incremental logic without tuple serializer/serializer" << DEFAULT_COLOR << std::endl;
1461+
exit(EXIT_FAILURE);
1462+
}
1463+
}
1464+
else if constexpr (std::is_invocable<decltype(func), const tuple_t &, result_t &>::value ||
1465+
std::is_invocable<decltype(func), const tuple_t &, result_t &, RuntimeContext &>::value) {
1466+
if (isTupleFunctions) {
1467+
std::cerr << RED << "WindFlow Error: P_Keyed_Windows receives tuple serializer/deserializer with an incremental logic" << DEFAULT_COLOR << std::endl;
1468+
exit(EXIT_FAILURE);
1469+
}
1470+
if (!isResultFunctions) {
1471+
std::cerr << RED << "WindFlow Error: P_Keyed_Windows instantiated with an incremental logic without result serializer/serializer" << DEFAULT_COLOR << std::endl;
1472+
exit(EXIT_FAILURE);
1473+
}
1474+
}
14551475
return p_keyed_wins_t(func,
14561476
key_extr,
14571477
this->parallelism,

wf/persistent/p_window_replica.hpp

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ class P_Window_Replica: public Basic_Replica
230230
return final_range;
231231
}
232232

233+
#if 0
233234
// getDistance method
234235
size_t getDistance(const wrapper_t &_w1,
235236
const wrapper_t &_w2,
@@ -254,6 +255,7 @@ class P_Window_Replica: public Basic_Replica
254255
its.second = history_buffer.end();
255256
return std::distance(its.first, its.second);
256257
}
258+
#endif
257259

258260
// getEnd method
259261
input_iterator_t getEnd(Key_Descriptor &_kd)
@@ -336,17 +338,6 @@ class P_Window_Replica: public Basic_Replica
336338
delete mydb_results;
337339
}
338340

339-
/*
340-
template<typename X>
341-
rocksdb::DB *get_operator_internal_db()
342-
{
343-
if (std::is_same<X, tuple_t>::value) {
344-
return mydb_wrappers->get_internal_db();
345-
}
346-
return mydb_results->get_internal_db();
347-
}
348-
*/
349-
350341
// svc (utilized by the FastFlow runtime)
351342
void *svc(void *_in) override
352343
{
@@ -504,9 +495,9 @@ class P_Window_Replica: public Basic_Replica
504495
(this->context).setContextParameters(_timestamp, _watermark); // set the parameter of the RuntimeContext
505496
func(iter, res, this->context);
506497
}
507-
}
508-
if (t_s) { // purge tuples from the archive
509-
purge(*t_s, key_d, key);
498+
if (t_s) { // purge tuples from the archive
499+
purge(*t_s, key_d, key);
500+
}
510501
}
511502
cnt_fired++;
512503
key_d.last_lwid++;

wf/window_replica.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -322,9 +322,9 @@ class Window_Replica: public Basic_Replica
322322
(this->context).setContextParameters(_timestamp, _watermark); // set the parameter of the RuntimeContext
323323
func(iter, win.getResult(), this->context);
324324
}
325-
}
326-
if (t_s) { // purge tuples from the archive
327-
(key_d.archive).purge(*t_s);
325+
if (t_s) { // purge tuples from the archive
326+
(key_d.archive).purge(*t_s);
327+
}
328328
}
329329
cnt_fired++;
330330
key_d.last_lwid++;

0 commit comments

Comments
 (0)