@@ -78,22 +78,16 @@ class IJoin_Replica: public Basic_Replica
7878 using iterator_t = typename container_t ::iterator; // iterator type for accessing wrapped tuples in the archive
7979 using compare_func_t = std::function<bool (const wrapper_t &, const uint64_t &)>; // function type to compare wrapped tuple to an uint64
8080
81- /* *
82- * @brief Structure to store statistics about an archive.
83- */
84- struct Archive_Stats
81+ struct Archive_Stats // structure to store statistics about an archive
8582 {
86- // Total size of the archive.
87- size_t size;
83+ size_t size; // total size of the archive
84+ uint64_t size_count; // number of times the size of the archive has been recorded
8885
89- // Number of times the size of the archive has been recorded
90- uint64_t size_count;
91-
92- // Default constructor for Archive_Stats
86+ // Constructor
9387 Archive_Stats ():
9488 size (0 ),
9589 size_count (0 ) {}
96-
90+
9791 // Records the size of the archive
9892 void recordSize (uint64_t _size)
9993 {
@@ -114,13 +108,15 @@ class IJoin_Replica: public Basic_Replica
114108 {
115109 JoinArchive<tuple_t , compare_func_t > archiveA; // archive of stream A tuples of this key
116110 JoinArchive<tuple_t , compare_func_t > archiveB; // archive of stream B tuples of this key
117- Archive_Stats archive_metrics;
111+ Archive_Stats archive_metrics; // archive of statistics for this key
112+ uint64_t partitioning_counter; // counter used in DP mode to establish which replica will save the given tuple
118113
119114 // Constructor
120115 Key_Descriptor (compare_func_t _compare_func):
121116 archiveA (_compare_func),
122117 archiveB (_compare_func),
123- archive_metrics (Archive_Stats()) {}
118+ archive_metrics (Archive_Stats()),
119+ partitioning_counter (0 ) {}
124120
125121 // recordSize method
126122 void recordSize ()
@@ -139,21 +135,6 @@ class IJoin_Replica: public Basic_Replica
139135 size_t id_inner; // id_inner value
140136 size_t num_inner; // num_inner value
141137
142- // Calculates the FNV-1a hash value for the given key
143- const size_t fnv1a_hash (const void * key,
144- const size_t len = sizeof (uint64_t ))
145- {
146- const char * data = (char *)key;
147- const size_t prime = 0x1000193 ;
148- size_t hash = 0x811c9dc5 ;
149- for (int i = 0 ; i<len; i++) {
150- uint8_t value = data[i];
151- hash = hash ^ value;
152- hash *= prime;
153- }
154- return hash;
155- }
156-
157138 // Checks if the given Join_Stream_t is Stream A
158139 bool isStreamA (Join_Stream_t stream) const
159140 {
@@ -334,21 +315,9 @@ class IJoin_Replica: public Basic_Replica
334315 insertIntoBuffer (key_d, wrapper_t (_tuple, _timestamp), _tag);
335316 }
336317 else if (joinMode == Join_Mode_t::DP) {
337- if constexpr (if_defined_hash<tuple_t >) {
338- // compute the hash index of the tuple given a defined hash function specialization for the tuple_t
339- size_t hash = std::hash<tuple_t >()(_tuple);
340- size_t hash_idx = (hash % num_inner);
341- if (hash_idx == id_inner) {
342- insertIntoBuffer (key_d, wrapper_t (_tuple, _timestamp), _tag);
343- }
344- }
345- else {
346- // compute the hash index of the tuple using FNV-1a hash function using the timestamp
347- size_t hash = fnv1a_hash (&_timestamp);
348- size_t hash_idx = (hash % num_inner);
349- if (hash_idx == id_inner) {
350- insertIntoBuffer (key_d, wrapper_t (_tuple, _timestamp), _tag);
351- }
318+ key_d.partitioning_counter ++;
319+ if (key_d.partitioning_counter % num_inner == id_inner) {
320+ insertIntoBuffer (key_d, wrapper_t (_tuple, _timestamp), _tag);
352321 }
353322 }
354323 if (this ->execution_mode == Execution_Mode_t::DEFAULT) {
@@ -363,17 +332,6 @@ class IJoin_Replica: public Basic_Replica
363332 last_time = _timestamp;
364333 }
365334 }
366- #if defined (WF_JOIN_MEASUREMENT)
367- // Measure the size of the archives every 200ms
368- uint64_t delta = (current_time_nsecs () - last_measured_size_time) / 1e06 ; // ms
369- if (delta >= 200 ) {
370- for (auto &k: keyMap) {
371- Key_Descriptor &key_m_d = (k.second );
372- (key_m_d.recordSize ());
373- }
374- last_measured_size_time = current_time_nsecs ();
375- }
376- #endif
377335 }
378336
379337 double getArchiveMeanSize () const
@@ -576,61 +534,11 @@ class Interval_Join: public Basic_Operator
576534 // Destructor
577535 ~Interval_Join () override
578536 {
579- #if defined(WF_JOIN_MEASUREMENT)
580- if (this ->isTerminated ()) {
581- printArchivePerKeyStats ();
582- }
583- #endif
584537 for (auto *r: replicas) { // delete all the replicas
585538 delete r;
586539 }
587540 }
588541
589- /* *
590- * @brief Print statistics of the archive per key.
591- *
592- * This function calculates and prints various statistics about the archive per key.
593- * It calculates the mean archive size for each replica and checks the distribution
594- * of the archive sizes and determines if it is balanced or not.
595- *
596- * @note This function assumes that the `replicas` vector is already populated with
597- * valid replica objects.
598- *
599- * @note The balance check is determined based on the coefficient of variation (cv) value.
600- * If the cv is less than 20, it is considered balanced and marked with a checkmark,
601- * otherwise it is considered unbalanced and marked with a cross.
602- *
603- * @note The function uses the `std::cout` stream to print the statistics.
604- */
605- void printArchivePerKeyStats ()
606- {
607- std::cout << " ***" << std::endl;
608- std::cout << " Archive Stats: " << std::endl;
609- uint64_t num_replicas = replicas.size ();
610- double acc_mean = 0.0 ;
611- int i = 0 ;
612- for (auto *r: replicas) {
613- auto mean = r->getArchiveMeanSize ();
614- std::cout << (i+1 ) << " Replica mean -> " << mean << std::endl;
615- acc_mean += mean;
616- i++;
617- }
618- double mean_size = acc_mean / num_replicas;
619- double size_in_mb = mean_size * sizeof (tuple_t ) / 1024 ;
620- std::cout << " Global Mean Archive Size -> " << mean_size << " | " << size_in_mb << " KB" << std::endl;
621- // Check distribution
622- double variance = 0 ;
623- for (auto *r: replicas) {
624- variance += std::pow (r->getArchiveMeanSize () - mean_size, 2 );
625- }
626- variance /= num_replicas;
627- double cv = variance != 0 ? sqrt (variance) / mean_size * 100 : 0.0 ; // coefficient of variation
628- std::string check_balance = cv < 20 ? " ✔ " : " ✘ " ;
629- std::cout << std::fixed << std::setprecision (2 );
630- std::cout << " Variance -> " << variance << " | Coefficient of variation -> " << cv << " | Balanced Distribution ->" << check_balance << std::endl;
631- std::cout << " ***" << std::endl;
632- }
633-
634542 /* *
635543 * \brief Get the type of the Interval Join as a string
636544 * \return type of the Interval Join
0 commit comments