Skip to content

Commit 56d4fc5

Browse files
committed
fix cmakelist.txt
1 parent 62976bc commit 56d4fc5

15 files changed

+251
-98
lines changed

.gitmodules

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,3 @@
1313
[submodule "flex/third_party/parallel-hashmap"]
1414
path = flex/third_party/parallel-hashmap
1515
url = https://github.com/greg7mdp/parallel-hashmap.git
16-
[submodule "flex/third_party/cppkafka"]
17-
path = flex/third_party/cppkafka
18-
url = https://github.com/mfontanini/cppkafka.git

flex/CMakeLists.txt

Lines changed: 15 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,21 @@ if (NOT yaml-cpp_FOUND)
122122
message(FATAL_ERROR "yaml-cpp not found, please install the yaml-cpp library")
123123
endif ()
124124

125+
#find CppKafka-------------------------------------------------------------------
126+
if (BUILD_KAFKA_WAL_WRITER_PARSER)
127+
find_package(CppKafka)
128+
if (NOT CppKafka_FOUND)
129+
message(FATAL_ERROR "cppkafka not found, please install cppkafka library")
130+
else()
131+
include_directories(SYSTEM ${CppKafka_INCLUDE_DIRS})
132+
message(STATUS "cpp kafka include dir: ${CppKafka_INCLUDE_DIRS}")
133+
set(CppKafka_LIBRARIES CppKafka::cppkafka)
134+
add_definitions(-DBUILD_KAFKA_WAL_WRITER_PARSER)
135+
message(STATUS "cppkafka found")
136+
endif ()
137+
endif()
138+
139+
125140
#find boost----------------------------------------------------------------------
126141
find_package(Boost REQUIRED COMPONENTS system filesystem
127142
# required by folly
@@ -166,38 +181,6 @@ endif ()
166181
find_package(Protobuf REQUIRED)
167182
include_directories(${Protobuf_INCLUDE_DIRS})
168183

169-
#find CppKafka-------------------------------------------------------------------
170-
if (BUILD_KAFKA_WAL_WRITER_PARSER)
171-
find_package(CppKafka)
172-
if (NOT CppKafka_FOUND)
173-
message(STATUS "cppkafka not found, try to build with third_party/cppkafka")
174-
set(CPPKAFKA_BOOST_STATIC_LIBS OFF)
175-
add_subdirectory(third_party/cppkafka)
176-
if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/CMakeLists.txt)
177-
message(FATAL_ERROR "cppkafka not found, please run git submodule update --init --recursive")
178-
endif ()
179-
# Apply third_party/cppkafka_without_boost.patch
180-
# to remove the dependency of boost in cppkafka
181-
# run git apply ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka_without_boost.patch
182-
add_custom_target(apply_patch
183-
COMMAND git apply ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka_without_boost.patch || true
184-
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka
185-
COMMENT "Applying patch to cppkafka"
186-
VERBATIM)
187-
188-
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/include)
189-
set (CppKafka_INSTALL_DIR ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/build)
190-
list (APPEND CMAKE_PREFIX_PATH ${CppKafka_INSTALL_DIR})
191-
set(CppKafka_LIBRARIES cppkafka)
192-
else()
193-
include_directories(SYSTEM ${CppKafka_INCLUDE_DIRS})
194-
message(STATUS "cpp kafka include dir: ${CppKafka_INCLUDE_DIRS}")
195-
set(CppKafka_LIBRARIES CppKafka::cppkafka)
196-
endif ()
197-
add_definitions(-DBUILD_KAFKA_WAL_WRITER_PARSER)
198-
message(STATUS "cppkafka found")
199-
endif()
200-
201184
if (BUILD_WITH_OTEL)
202185
find_package(opentelemetry-cpp CONFIG)
203186
if (OPENTELEMETRY_CPP_FOUND)

flex/engines/graph_db/CMakeLists.txt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc"
44
"${CMAKE_CURRENT_SOURCE_DIR}/database/wal/*.cc"
55
"${CMAKE_CURRENT_SOURCE_DIR}/app/builtin/*.cc")
66

7-
if (BUILD_KAFKA_WAL_WRITER_PARSER)
8-
list(APPEND GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/database/wal/kafka_wal_writer.cc"
9-
"${CMAKE_CURRENT_SOURCE_DIR}/database/wal/kafka_wal_parser.cc")
7+
if (NOT BUILD_KAFKA_WAL_WRITER_PARSER)
8+
list(REMOVE_ITEM GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/database/wal/kafka_wal_writer.cc"
9+
"${CMAKE_CURRENT_SOURCE_DIR}/database/wal/kafka_wal_parser.cc")
1010
endif()
1111

1212
add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})
@@ -17,7 +17,6 @@ target_link_libraries(flex_graph_db runtime_execute)
1717
if (BUILD_KAFKA_WAL_WRITER_PARSER)
1818
target_link_libraries(flex_graph_db ${CppKafka_LIBRARIES})
1919
endif()
20-
add_dependencies(flex_graph_db apply_patch)
2120
install_flex_target(flex_graph_db)
2221

2322
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/** Copyright 2020 Alibaba Group Holding Limited.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
#include "flex/engines/graph_db/app/kafka_wal_ingester_app.h"
17+
#include "cppkafka/cppkafka.h"
18+
#include "flex/engines/graph_db/database/graph_db.h"
19+
#include "flex/engines/graph_db/database/wal/kafka_wal_parser.h"
20+
21+
namespace gs {
22+
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
23+
24+
class KafkaWalConsumer {
25+
public:
26+
struct CustomComparator {
27+
inline bool operator()(const std::string& lhs, const std::string& rhs) {
28+
const WalHeader* header1 = reinterpret_cast<const WalHeader*>(lhs.data());
29+
const WalHeader* header2 = reinterpret_cast<const WalHeader*>(rhs.data());
30+
return header1->timestamp > header2->timestamp;
31+
}
32+
};
33+
static constexpr const std::chrono::milliseconds POLL_TIMEOUT =
34+
std::chrono::milliseconds(100);
35+
36+
// always track all partitions and from begining
37+
KafkaWalConsumer(cppkafka::Configuration config,
38+
const std::string& topic_name, int32_t thread_num) {
39+
auto topic_partitions = get_all_topic_partitions(config, topic_name);
40+
consumers_.reserve(topic_partitions.size());
41+
for (size_t i = 0; i < topic_partitions.size(); ++i) {
42+
consumers_.emplace_back(std::make_unique<cppkafka::Consumer>(config));
43+
consumers_.back()->assign({topic_partitions[i]});
44+
}
45+
}
46+
47+
std::string poll() {
48+
for (auto& consumer : consumers_) {
49+
auto msg = consumer->poll();
50+
if (msg) {
51+
if (msg.get_error()) {
52+
if (!msg.is_eof()) {
53+
LOG(INFO) << "[+] Received error notification: " << msg.get_error();
54+
}
55+
} else {
56+
std::string payload = msg.get_payload();
57+
message_queue_.push(payload);
58+
consumer->commit(msg);
59+
}
60+
}
61+
}
62+
if (message_queue_.empty()) {
63+
return "";
64+
}
65+
std::string payload = message_queue_.top();
66+
message_queue_.pop();
67+
return payload;
68+
}
69+
70+
private:
71+
std::vector<std::unique_ptr<cppkafka::Consumer>> consumers_;
72+
std::priority_queue<std::string, std::vector<std::string>, CustomComparator>
73+
message_queue_;
74+
};
75+
76+
bool KafkaWalIngesterApp::Query(GraphDBSession& graph, Decoder& input,
77+
Encoder& output) {
78+
// TODO: get value from config
79+
std::string kafka_brokers = std::string(input.get_string());
80+
std::string brokers = std::string(input.get_string());
81+
std::string topic_name = std::string(input.get_string());
82+
std::string group_id = std::string(input.get_string());
83+
std::string engine_endpoint = std::string(input.get_string());
84+
LOG(INFO) << "Kafka brokers: " << kafka_brokers;
85+
LOG(INFO) << "engine endpoint: " << engine_endpoint;
86+
87+
cppkafka::Configuration config = {{"metadata.broker.list", kafka_brokers},
88+
{"group.id", group_id},
89+
// Disable auto commit
90+
{"enable.auto.commit", false}};
91+
gs::KafkaWalConsumer consumer(config, topic_name, 1);
92+
// TODO: how to make it stop
93+
while (!graph.db().kafka_wal_ingester_state()) {
94+
auto res = consumer.poll();
95+
if (res.empty()) {
96+
std::this_thread::sleep_for(gs::KafkaWalConsumer::POLL_TIMEOUT);
97+
continue;
98+
}
99+
100+
auto header = reinterpret_cast<const WalHeader*>(res.data());
101+
if (header->type == 0) {
102+
auto txn = graph.GetInsertTransaction();
103+
txn.IngestWal(graph.graph(), txn.timestamp(),
104+
const_cast<char*>(res.data()) + sizeof(WalHeader),
105+
header->length, txn.allocator());
106+
} else if (header->type == 1) {
107+
auto txn = graph.GetUpdateTransaction();
108+
txn.IngestWal(graph.graph(), graph.db().work_dir(), txn.timestamp(),
109+
const_cast<char*>(res.data()) + sizeof(WalHeader),
110+
header->length, txn.allocator());
111+
} else {
112+
LOG(ERROR) << "Unknown wal type: " << header->type;
113+
}
114+
}
115+
return true;
116+
}
117+
AppWrapper KafkaWalIngesterAppFactory::CreateApp(const GraphDB& db) {
118+
return AppWrapper(new KafkaWalIngesterApp(), NULL);
119+
}
120+
#endif
121+
} // namespace gs
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/** Copyright 2020 Alibaba Group Holding Limited.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
#ifndef ENGINES_KAFKA_WAL_INGESTER_APP_H_
17+
#define ENGINES_KAFKA_WAL_INGESTER_APP_H_
18+
19+
#include "flex/engines/graph_db/app/app_base.h"
20+
#include "flex/engines/graph_db/database/graph_db_session.h"
21+
22+
namespace gs {
23+
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
24+
// Ingest wal from kafka
25+
class KafkaWalIngesterApp : public WriteAppBase {
26+
public:
27+
KafkaWalIngesterApp() {}
28+
29+
AppType type() const override { return AppType::kBuiltIn; }
30+
31+
bool Query(GraphDBSession& graph, Decoder& input, Encoder& output) override;
32+
};
33+
34+
class KafkaWalIngesterAppFactory : public AppFactoryBase {
35+
public:
36+
KafkaWalIngesterAppFactory() = default;
37+
~KafkaWalIngesterAppFactory() = default;
38+
39+
AppWrapper CreateApp(const GraphDB& db) override;
40+
};
41+
#endif
42+
43+
} // namespace gs
44+
45+
#endif // ENGINES_KAFKA_WAL_INGESTER_APP_H_

flex/engines/graph_db/database/graph_db.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
#include "flex/engines/graph_db/runtime/utils/cypher_runner_impl.h"
2929
#include "flex/utils/yaml_utils.h"
3030

31+
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
32+
#include "flex/engines/graph_db/app/kafka_wal_ingester_app.h"
33+
#endif
34+
3135
#include "flex/third_party/httplib.h"
3236

3337
namespace gs {
@@ -274,6 +278,38 @@ void GraphDB::Close() {
274278
std::fill(app_factories_.begin(), app_factories_.end(), nullptr);
275279
}
276280

281+
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
282+
bool GraphDB::kafka_wal_ingester_state() const {
283+
return kafka_wal_ingester_thread_running_.load();
284+
}
285+
286+
void GraphDB::start_kafka_wal_ingester(const std::string& kafka_brokers,
287+
const std::string& brokers,
288+
const std::string& topic_name,
289+
const std::string& group_id,
290+
const std::string& engine_endpoint) {
291+
if (kafka_wal_ingester_thread_running_) {
292+
kafka_wal_ingester_thread_running_ = false;
293+
kafka_wal_ingester_thread_.join();
294+
}
295+
std::vector<char> buffer;
296+
gs::Encoder encoder(buffer);
297+
encoder.put_string(kafka_brokers);
298+
encoder.put_string(brokers);
299+
encoder.put_string(topic_name);
300+
encoder.put_string(group_id);
301+
encoder.put_string(engine_endpoint);
302+
gs::Decoder decoder(buffer.data(), buffer.size());
303+
KafkaWalIngesterApp().Query(GetSession(0), decoder, encoder);
304+
}
305+
306+
void GraphDB::stop_kafka_wal_ingester() {
307+
kafka_wal_ingester_thread_running_ = false;
308+
kafka_wal_ingester_thread_.join();
309+
}
310+
311+
#endif
312+
277313
ReadTransaction GraphDB::GetReadTransaction(int thread_id) {
278314
return contexts_[thread_id].session.GetReadTransaction();
279315
}

flex/engines/graph_db/database/graph_db.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,18 @@ class GraphDB {
169169

170170
inline const GraphDBConfig& config() const { return config_; }
171171

172+
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
173+
bool kafka_wal_ingester_state() const;
174+
175+
void start_kafka_wal_ingester(const std::string& kafka_brokers,
176+
const std::string& brokers,
177+
const std::string& topic_name,
178+
const std::string& group_id,
179+
const std::string& engine_endpoint);
180+
181+
void stop_kafka_wal_ingester();
182+
#endif
183+
172184
private:
173185
bool registerApp(const std::string& path, uint8_t index = 0);
174186

@@ -204,6 +216,9 @@ class GraphDB {
204216
std::thread monitor_thread_;
205217
bool monitor_thread_running_;
206218

219+
std::thread kafka_wal_ingester_thread_;
220+
std::atomic<bool> kafka_wal_ingester_thread_running_;
221+
207222
timestamp_t last_compaction_ts_;
208223
bool compact_thread_running_ = false;
209224
std::thread compact_thread_;

flex/engines/graph_db/database/insert_transaction.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,8 @@ const Schema& InsertTransaction::schema() const { return graph_.schema(); }
226226

227227
const GraphDBSession& InsertTransaction::GetSession() const { return session_; }
228228

229+
Allocator& InsertTransaction::allocator() const { return alloc_; }
230+
229231
#define likely(x) __builtin_expect(!!(x), 1)
230232

231233
bool InsertTransaction::get_vertex_with_retries(MutablePropertyFragment& graph,

flex/engines/graph_db/database/insert_transaction.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ class InsertTransaction {
6161

6262
const GraphDBSession& GetSession() const;
6363

64+
Allocator& allocator() const;
65+
6466
private:
6567
void clear();
6668

flex/engines/graph_db/database/update_transaction.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ UpdateTransaction::~UpdateTransaction() { release(); }
102102

103103
timestamp_t UpdateTransaction::timestamp() const { return timestamp_; }
104104

105+
Allocator& UpdateTransaction::allocator() { return alloc_; }
106+
105107
bool UpdateTransaction::Commit() {
106108
if (timestamp_ == std::numeric_limits<timestamp_t>::max()) {
107109
return true;

flex/engines/graph_db/database/update_transaction.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ class UpdateTransaction {
153153

154154
const GraphDBSession& GetSession() const;
155155

156+
Allocator& allocator();
157+
156158
private:
157159
friend class GraphDBSession;
158160
bool batch_commit(UpdateBatch& batch);

0 commit comments

Comments
 (0)