Skip to content

Commit a85048d

Browse files
authored
Merge pull request #21 from DropB1t/interval-join
[ADD] Interval Join Operator Implementation
2 parents cbeb657 + 9832f8a commit a85048d

32 files changed

+4076
-53
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
# .DS_Store are banished
2-
.DS_Store
2+
.DS_Store

API

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ The corresponding builder needs two parameters (for the lift and combine logics)
158158
* Combine
159159
__host__ __device__ void(const result_t &, const result_t &, result_t &);
160160

161+
Interval_Join
162+
----------------
163+
std::optional<result_t> (const tuple_t &, const tuple_t &)
164+
std::optional<result_t> (const tuple_t &, const tuple_t &, RuntimeContext &)
165+
161166
SINK
162167
----
163168
void(std::optional<tuple_t> &);

docs/windflow-doxygen.conf

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -845,8 +845,9 @@ INPUT = ../wf/doxygen-mainpage.hpp \
845845
../wf/source_shipper.hpp \
846846
../wf/windflow.hpp \
847847
../wf/windflow_gpu.hpp \
848-
../wf/kafka/windflow_kafka.hpp
849-
../wf/rocksdb/windflow_rocksdb.hpp
848+
../wf/kafka/windflow_kafka.hpp \
849+
../wf/rocksdb/windflow_rocksdb.hpp \
850+
../wf/interval_join.hpp
850851

851852
# This tag can be used to specify the character encoding of the source files
852853
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses

tests/CMakeLists.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ add_subdirectory(graph_tests)
33
add_subdirectory(merge_tests)
44
add_subdirectory(split_tests)
55
add_subdirectory(win_tests)
6+
add_subdirectory(join_tests)
67

78
# Add custom target for all the CPU tests
89
add_custom_target(all_cpu)
@@ -11,11 +12,11 @@ if(LIBRDKAFKA_FOUND AND LibRDKafka_LIBRARIES)
1112
# Add the sub-folder with Kafka tests
1213
add_subdirectory(kafka_tests)
1314
# Add dependencies to the all_cpu target
14-
add_dependencies(all_cpu graph_tests merge_tests split_tests win_tests kafka_tests)
15+
add_dependencies(all_cpu graph_tests merge_tests split_tests win_tests join_tests kafka_tests)
1516
else()
1617
message(STATUS "librdkafka needs to be installed to generate kafka tests")
1718
# Add dependencies to the all_cpu target
18-
add_dependencies(all_cpu graph_tests merge_tests split_tests win_tests)
19+
add_dependencies(all_cpu graph_tests merge_tests split_tests win_tests join_tests)
1920
endif()
2021

2122
if(ROCKSDB_FOUND AND ROCKSDB_LIBRARIES)

tests/join_tests/CMakeLists.txt

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Compiler and flags
2+
set(CMAKE_CXX_STANDARD 17)
3+
set(CMAKE_CXX_STANDARD_REQUIRED ON)
4+
set(CMAKE_CXX_FLAGS_DEBUG "-g -O0")
5+
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -g -finline-functions")
6+
7+
# Macros to be provided to the compiler
8+
add_definitions(-DFF_BOUNDED_BUFFER)
9+
# -DWF_JOIN_MEASUREMENT to enable the measurement of how uniformerly the tuples are distributed among the joiners
10+
# -DWF_TRACING_ENABLED to enable the tracing with Dashboard
11+
12+
# Header files of WindFlow and FastFlow
13+
include_directories(${PROJECT_SOURCE_DIR}/wf ${ff_root_dir})
14+
15+
# Linking to pthread
16+
# cdt gvc cgraph to enable the tracing with Dashboard
17+
link_libraries(pthread)
18+
19+
# Set output directory
20+
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ../../bin/join_tests)
21+
22+
# Cpp files to be compiled
23+
file(GLOB SOURCES "*.cpp")
24+
25+
# Add a target for each cpp file and a unique target for all the tests in this folder
26+
add_custom_target(join_tests)
27+
28+
foreach(testsourcefile ${SOURCES})
29+
get_filename_component(barename ${testsourcefile} NAME)
30+
string(REPLACE ".cpp" "" testname ${barename})
31+
add_executable(${testname} ${testsourcefile})
32+
add_dependencies(join_tests ${testname})
33+
endforeach(testsourcefile ${SOURCES})

tests/join_tests/join_common.hpp

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/**************************************************************************************
2+
* Copyright (c) 2023- Gabriele Mencagli and Yuriy Rymarchuk
3+
*
4+
* This file is part of WindFlow.
5+
*
6+
* WindFlow is free software dual licensed under the GNU LGPL or MIT License.
7+
* You can redistribute it and/or modify it under the terms of the
8+
* * GNU Lesser General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version
11+
* OR
12+
* * MIT License: https://github.com/ParaGroup/WindFlow/blob/master/LICENSE.MIT
13+
*
14+
* WindFlow is distributed in the hope that it will be useful,
15+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
16+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17+
* GNU Lesser General Public License for more details.
18+
* You should have received a copy of the GNU Lesser General Public License and
19+
* the MIT License along with WindFlow. If not, see <http://www.gnu.org/licenses/>
20+
* and <http://opensource.org/licenses/MIT/>.
21+
**************************************************************************************
22+
*/
23+
24+
/*
25+
* Data types and operator functors used by the join tests.
26+
*/
27+
28+
// includes
29+
#include<cmath>
30+
#include<string>
31+
32+
using namespace std;
33+
using namespace wf;
34+
35+
// Global variable for the result
36+
atomic<long> global_sum;
37+
38+
// Struct of the input tuple
39+
struct tuple_t
40+
{
41+
size_t key;
42+
int64_t value;
43+
};
44+
45+
#if 1
46+
template<>
47+
struct std::hash<tuple_t>
48+
{
49+
size_t operator()(const tuple_t &t) const
50+
{
51+
size_t h1 = std::hash<int64_t>()(t.value);
52+
size_t h2 = std::hash<size_t>()(t.key);
53+
return h1 ^ h2;
54+
}
55+
};
56+
#endif
57+
58+
struct res_t
59+
{
60+
size_t key;
61+
int64_t value;
62+
size_t from;
63+
};
64+
65+
// Source functor for generating positive numbers
66+
class Source_Positive_Functor
67+
{
68+
private:
69+
size_t len; // stream length per key
70+
size_t keys; // number of keys
71+
uint64_t next_ts; // next timestamp
72+
bool generateWS; // true if watermarks must be generated
73+
74+
public:
75+
// Constructor
76+
Source_Positive_Functor(size_t _len,
77+
size_t _keys,
78+
bool _generateWS):
79+
len(_len),
80+
keys(_keys),
81+
next_ts(0),
82+
generateWS(_generateWS) {}
83+
84+
// operator()
85+
void operator()(Source_Shipper<tuple_t> &shipper)
86+
{
87+
static thread_local std::mt19937 generator;
88+
generator.seed(1234);
89+
std::uniform_int_distribution<int> distribution(0, 250);
90+
for (size_t i=1; i<=len; i++) { // generation loop
91+
for (size_t k=1; k<=keys; k++) {
92+
tuple_t t;
93+
t.key = k;
94+
t.value = i;
95+
shipper.pushWithTimestamp(std::move(t), next_ts);
96+
if (generateWS) {
97+
shipper.setNextWatermark(next_ts);
98+
}
99+
auto offset = (distribution(generator)+1);
100+
next_ts += offset*1000; // in ms
101+
}
102+
}
103+
}
104+
};
105+
106+
// Source functor for generating negative numbers
107+
class Source_Negative_Functor
108+
{
109+
private:
110+
size_t len; // stream length per key
111+
size_t keys; // number of keys
112+
vector<int> values; // list of values
113+
uint64_t next_ts; // next timestamp
114+
bool generateWS; // true if watermarks must be generated
115+
116+
public:
117+
// Constructor
118+
Source_Negative_Functor(size_t _len,
119+
size_t _keys,
120+
bool _generateWS):
121+
len(_len),
122+
keys(_keys),
123+
values(_keys, 0),
124+
next_ts(0),
125+
generateWS(_generateWS) {}
126+
127+
// operator()
128+
void operator()(Source_Shipper<tuple_t> &shipper)
129+
{
130+
static thread_local std::mt19937 generator;
131+
generator.seed(4321);
132+
std::uniform_int_distribution<int> distribution(0, 250);
133+
for (size_t i=1; i<=len; i++) { // generation loop
134+
for (size_t k=1; k<=keys; k++) {
135+
values[k-1]--;
136+
tuple_t t;
137+
t.key = k;
138+
t.value = values[k-1];
139+
shipper.pushWithTimestamp(std::move(t), next_ts);
140+
if (generateWS) {
141+
shipper.setNextWatermark(next_ts);
142+
}
143+
auto offset = (distribution(generator)+1);
144+
next_ts += offset*1000; // in ms
145+
}
146+
}
147+
}
148+
};
149+
150+
// Map functor
151+
class Map_Functor
152+
{
153+
public:
154+
// operator()
155+
void operator()(tuple_t &t)
156+
{
157+
t.value = t.value + 2;
158+
}
159+
};
160+
161+
// Join functor
162+
class Join_Functor
163+
{
164+
public:
165+
// operator()
166+
optional<tuple_t> operator()(const tuple_t &a, const tuple_t &b, RuntimeContext &rc)
167+
{
168+
tuple_t out;
169+
out.value = a.value * b.value;
170+
out.key = a.key;
171+
return out;
172+
}
173+
};
174+
175+
// Distinct Join functor
176+
class Distinct_Join_Functor
177+
{
178+
public:
179+
// operator()
180+
optional<tuple_t> operator()(const tuple_t &a, const tuple_t &b)
181+
{
182+
if (a.value != b.value) {
183+
tuple_t out;
184+
out.value = a.value * b.value;
185+
out.key = a.key;
186+
return out;
187+
}
188+
return {};
189+
}
190+
};
191+
192+
// Filter functor with keyby distribution
193+
class Filter_Functor_KB
194+
{
195+
private:
196+
int mod;
197+
198+
public:
199+
// constructor
200+
Filter_Functor_KB(int _mod): mod(_mod) {}
201+
202+
// operator()
203+
bool operator()(tuple_t &t, RuntimeContext &rc)
204+
{
205+
assert(t.key % rc.getParallelism() == rc.getReplicaIndex());
206+
if (t.value % mod == 0) {
207+
return true;
208+
}
209+
else {
210+
return false;
211+
}
212+
}
213+
};
214+
215+
// Filter functor
216+
class Filter_Functor
217+
{
218+
private:
219+
int mod;
220+
221+
public:
222+
// constructor
223+
Filter_Functor(int _mod): mod(_mod) {}
224+
225+
// operator()
226+
bool operator()(tuple_t &t)
227+
{
228+
if (t.value % mod == 0) {
229+
return true;
230+
}
231+
else {
232+
return false;
233+
}
234+
}
235+
};
236+
237+
// FlatMap functor
238+
class FlatMap_Functor
239+
{
240+
public:
241+
// operator()
242+
void operator()(const tuple_t &t, Shipper<tuple_t> &shipper)
243+
{
244+
for (size_t i=0; i<2; i++) {
245+
shipper.push(t);
246+
}
247+
}
248+
};
249+
250+
// Sink functor
251+
class Sink_Functor
252+
{
253+
private:
254+
size_t received; // counter of received results
255+
long totalsum;
256+
257+
public:
258+
// Constructor
259+
Sink_Functor():
260+
received(0),
261+
totalsum(0) {}
262+
263+
// operator()
264+
void operator()(optional<tuple_t> &out, RuntimeContext &rc)
265+
{
266+
if (out) {
267+
received++;
268+
totalsum += (*out).value;
269+
size_t key = (*out).key;
270+
int64_t value = (*out).value;
271+
}
272+
else {
273+
global_sum.fetch_add(totalsum);
274+
}
275+
}
276+
};

0 commit comments

Comments
 (0)