Skip to content

Commit 62976bc

Browse files
siyuan0322zhanglei1949
authored andcommitted
fix(interactive): disable repair wheel in coordinator dockerfile (#4517)
Fixes #4454 --------- Signed-off-by: siyuan0322 <[email protected]> fix aocc Committed-by: [email protected] from Dev container fix installation Committed-by: [email protected] from Dev container fix linking,todo: verify correctness Committed-by: xiaolei.zl from Dev container impl kafka writer and parser Committed-by: xiaolei.zl from Dev container Committed-by: xiaolei.zl from Dev container Committed-by: xiaolei.zl from Dev container minor fix Committed-by: xiaolei.zl from Dev container Committed-by: xiaolei.zl from Dev container Committed-by: xiaolei.zl from Dev container fix Committed-by: xiaolei.zl from Dev container Committed-by: xiaolei.zl from Dev container
1 parent 08ad5bf commit 62976bc

File tree

13 files changed

+624
-3
lines changed

13 files changed

+624
-3
lines changed

.github/workflows/interactive.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,23 @@ jobs:
377377
bash hqps_adhoc_test.sh ${INTERACTIVE_WORKSPACE} graph_algo \
378378
${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml
379379
380+
- name: Run kafka wal test
381+
env:
382+
GS_TEST_DIR: ${{ github.workspace }}/gstest
383+
run: |
384+
wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
385+
tar -zxf kafka_2.13-3.9.0.tgz
386+
cd kafka_2.13-3.9.0
387+
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
388+
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
389+
bin/kafka-server-start.sh config/kraft/reconfig-server.properties &
390+
391+
bin/kafka-topics.sh --create --topic kafka-test --bootstrap-server localhost:9092
392+
393+
./tests/hqps/kafka_test localhost:902 kafka-test
394+
395+
pkill -f kafka-server-start.sh
396+
380397
- name: Run Gremlin test on modern graph
381398
env:
382399
GS_TEST_DIR: ${{ github.workspace }}/gstest

.gitmodules

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,6 @@
1313
[submodule "flex/third_party/parallel-hashmap"]
1414
path = flex/third_party/parallel-hashmap
1515
url = https://github.com/greg7mdp/parallel-hashmap.git
16+
[submodule "flex/third_party/cppkafka"]
17+
path = flex/third_party/cppkafka
18+
url = https://github.com/mfontanini/cppkafka.git

docs/flex/interactive/development/dev_and_test.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,3 +262,40 @@ In Interactive's execution engine, transactions such as `ReadTransaction`, `Upda
262262
2. If a transaction returns `false` during the `commit()` process, the error occurred prior to applying the WAL to the graph data. This type of failure could arise during the construction of the WAL or during its writing phase.
263263

264264
3. It is important to note that errors can still occur when replaying the WAL to the graph database. Replaying might fail due to limitations in resources or due to unforeseen bugs. **However,** any errors encountered during this stage will be handled via exceptions or may result in process failure. Currently, there is no established mechanism to handle such failures. Future improvements should focus on implementing failover strategies, potentially allowing the GraphDB to continue replaying the WAL until it succeeds.
265+
266+
## Persisting WAL to kafka
267+
268+
Kafka-based WAL storages is also provided. Follows [kafka-quick-start](https://kafka.apache.org/quickstart).
269+
270+
### Install kafka
271+
272+
```bash
273+
wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
274+
tar -zxf kafka_2.13-3.9.0.tgz
275+
cd kafka_2.13-3.9.0
276+
```
277+
278+
### kafka with kraft
279+
280+
```bash
281+
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
282+
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties
283+
bin/kafka-server-start.sh config/kraft/reconfig-server.properties
284+
```
285+
286+
### Create topic
287+
288+
```bash
289+
bin/kafka-topics.sh --create --topic kafka-test --bootstrap-server localhost:9092
290+
# describe the topic
291+
bin/kafka-topics.sh --describe --topic kafka-test --bootstrap-server localhost:9092
292+
```
293+
294+
### Test KafkaWalWriter and KafkaWalParser
295+
296+
```bash
297+
cd flex && mkdir build
298+
cd build && cmake .. -DBUILD_TEST=ON && make -j
299+
./tests/hqps/kafka_test localhost:902 kafka-test
300+
# run the kafka tst
301+
```

flex/CMakeLists.txt

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ option(USE_PTHASH "Whether to use pthash" OFF)
1717
option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build optimized code on host
1818
option(USE_STATIC_ARROW "Whether to use static arrow" OFF) # Whether to link arrow statically, default is OFF
1919
option(BUILD_WITH_OTEL "Whether to build with opentelemetry-cpp" OFF) # Whether to build with opentelemetry-cpp, default is OFF
20+
option(BUILD_KAFKA_WAL_WRITER_PARSER "Whether to build kafka wal writer and wal parser" ON) # Whether to build kafka wal writer and wal parser, default is ON
2021

2122
#print options
2223
message(STATUS "Build test: ${BUILD_TEST}")
@@ -165,6 +166,38 @@ endif ()
165166
find_package(Protobuf REQUIRED)
166167
include_directories(${Protobuf_INCLUDE_DIRS})
167168

169+
#find CppKafka-------------------------------------------------------------------
170+
if (BUILD_KAFKA_WAL_WRITER_PARSER)
171+
find_package(CppKafka)
172+
if (NOT CppKafka_FOUND)
173+
message(STATUS "cppkafka not found, try to build with third_party/cppkafka")
174+
set(CPPKAFKA_BOOST_STATIC_LIBS OFF)
175+
add_subdirectory(third_party/cppkafka)
176+
if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/CMakeLists.txt)
177+
message(FATAL_ERROR "cppkafka not found, please run git submodule update --init --recursive")
178+
endif ()
179+
# Apply third_party/cppkafka_without_boost.patch
180+
# to remove the dependency of boost in cppkafka
181+
# run git apply ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka_without_boost.patch
182+
add_custom_target(apply_patch
183+
COMMAND git apply ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka_without_boost.patch || true
184+
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka
185+
COMMENT "Applying patch to cppkafka"
186+
VERBATIM)
187+
188+
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/include)
189+
set (CppKafka_INSTALL_DIR ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/build)
190+
list (APPEND CMAKE_PREFIX_PATH ${CppKafka_INSTALL_DIR})
191+
set(CppKafka_LIBRARIES cppkafka)
192+
else()
193+
include_directories(SYSTEM ${CppKafka_INCLUDE_DIRS})
194+
message(STATUS "cpp kafka include dir: ${CppKafka_INCLUDE_DIRS}")
195+
set(CppKafka_LIBRARIES CppKafka::cppkafka)
196+
endif ()
197+
add_definitions(-DBUILD_KAFKA_WAL_WRITER_PARSER)
198+
message(STATUS "cppkafka found")
199+
endif()
200+
168201
if (BUILD_WITH_OTEL)
169202
find_package(opentelemetry-cpp CONFIG)
170203
if (OPENTELEMETRY_CPP_FOUND)

flex/engines/graph_db/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,20 @@ file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc"
44
"${CMAKE_CURRENT_SOURCE_DIR}/database/wal/*.cc"
55
"${CMAKE_CURRENT_SOURCE_DIR}/app/builtin/*.cc")
66

7+
if (BUILD_KAFKA_WAL_WRITER_PARSER)
8+
list(APPEND GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/database/wal/kafka_wal_writer.cc"
9+
"${CMAKE_CURRENT_SOURCE_DIR}/database/wal/kafka_wal_parser.cc")
10+
endif()
11+
712
add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES})
813

914
target_include_directories(flex_graph_db PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>)
1015
target_link_libraries(flex_graph_db flex_rt_mutable_graph ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
1116
target_link_libraries(flex_graph_db runtime_execute)
17+
if (BUILD_KAFKA_WAL_WRITER_PARSER)
18+
target_link_libraries(flex_graph_db ${CppKafka_LIBRARIES})
19+
endif()
20+
add_dependencies(flex_graph_db apply_patch)
1221
install_flex_target(flex_graph_db)
1322

1423
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/** Copyright 2020 Alibaba Group Holding Limited.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
#include "flex/engines/graph_db/database/wal/kafka_wal_parser.h"
17+
#include <fcntl.h>
18+
#include <sys/mman.h>
19+
#include <unistd.h>
20+
#include <filesystem>
21+
#include "flex/engines/graph_db/database/wal/wal.h"
22+
23+
namespace gs {
24+
25+
std::vector<cppkafka::TopicPartition> get_all_topic_partitions(
26+
const cppkafka::Configuration& config, const std::string& topic_name) {
27+
std::vector<cppkafka::TopicPartition> partitions;
28+
cppkafka::Consumer consumer(config); // tmp consumer
29+
auto meta_vector = consumer.get_metadata().get_topics({topic_name});
30+
if (meta_vector.empty()) {
31+
LOG(WARNING) << "Failed to get metadata for topic " << topic_name
32+
<< ", maybe the topic does not exist";
33+
return {};
34+
}
35+
auto metadata = meta_vector.front().get_partitions();
36+
for (const auto& partition : metadata) {
37+
partitions.push_back(cppkafka::TopicPartition(
38+
topic_name, partition.get_id(), 0)); // from the beginning
39+
}
40+
return partitions;
41+
}
42+
43+
std::unique_ptr<IWalParser> KafkaWalParser::Make() {
44+
const char* broker_list = std::getenv("KAFKA_BROKER_LIST");
45+
if (broker_list == nullptr) {
46+
LOG(FATAL) << "KAFKA_BROKER_LIST is not set";
47+
}
48+
const char* group_id = std::getenv("KAFKA_GROUP_ID");
49+
std::string group_id_str = group_id ? group_id : "interactive_consumer";
50+
cppkafka::Configuration config = {{"metadata.broker.list", broker_list},
51+
{"group.id", group_id_str}};
52+
return std::unique_ptr<IWalParser>(new KafkaWalParser(config));
53+
}
54+
55+
KafkaWalParser::KafkaWalParser(const cppkafka::Configuration& config)
56+
: consumer_(nullptr),
57+
insert_wal_list_(NULL),
58+
insert_wal_list_size_(0),
59+
last_ts_(0),
60+
config_(config) {
61+
consumer_ = std::make_unique<cppkafka::Consumer>(config);
62+
}
63+
64+
void KafkaWalParser::open(const std::string& topic_name) {
65+
auto topic_partitions = get_all_topic_partitions(config_, topic_name);
66+
open(topic_partitions);
67+
}
68+
69+
void KafkaWalParser::open(
70+
const std::vector<cppkafka::TopicPartition>& topic_partitions) {
71+
consumer_->assign(topic_partitions);
72+
insert_wal_list_ = static_cast<WalContentUnit*>(
73+
mmap(NULL, IWalWriter::MAX_WALS_NUM * sizeof(WalContentUnit),
74+
PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE,
75+
-1, 0));
76+
insert_wal_list_size_ = IWalWriter::MAX_WALS_NUM;
77+
78+
while (true) {
79+
auto msgs = consumer_->poll_batch(MAX_BATCH_SIZE);
80+
if (msgs.empty() || msgs.empty()) {
81+
LOG(INFO) << "No message are polled, the topic has been all consumed.";
82+
break;
83+
}
84+
for (auto& msg : msgs) {
85+
if (msg) {
86+
if (msg.get_error()) {
87+
if (!msg.is_eof()) {
88+
LOG(INFO) << "[+] Received error notification: " << msg.get_error();
89+
}
90+
} else {
91+
message_vector_.emplace_back(msg.get_payload());
92+
}
93+
}
94+
}
95+
}
96+
97+
for (auto& wal : message_vector_) {
98+
LOG(INFO) << "Got wal:" << wal.size();
99+
const char* payload = wal.data();
100+
const WalHeader* header = reinterpret_cast<const WalHeader*>(payload);
101+
uint32_t cur_ts = header->timestamp;
102+
if (cur_ts == 0) {
103+
LOG(WARNING) << "Invalid timestamp 0, skip";
104+
continue;
105+
}
106+
int length = header->length;
107+
if (header->type) {
108+
UpdateWalUnit unit;
109+
unit.timestamp = cur_ts;
110+
unit.ptr = const_cast<char*>(payload + sizeof(WalHeader));
111+
unit.size = length;
112+
update_wal_list_.push_back(unit);
113+
} else {
114+
if (insert_wal_list_[cur_ts].ptr) {
115+
LOG(WARNING) << "Duplicated timestamp " << cur_ts << ", skip";
116+
}
117+
insert_wal_list_[cur_ts].ptr =
118+
const_cast<char*>(payload + sizeof(WalHeader));
119+
insert_wal_list_[cur_ts].size = length;
120+
}
121+
last_ts_ = std::max(cur_ts, last_ts_);
122+
}
123+
124+
LOG(INFO) << "last_ts: " << last_ts_;
125+
if (!update_wal_list_.empty()) {
126+
std::sort(update_wal_list_.begin(), update_wal_list_.end(),
127+
[](const UpdateWalUnit& lhs, const UpdateWalUnit& rhs) {
128+
return lhs.timestamp < rhs.timestamp;
129+
});
130+
}
131+
}
132+
133+
void KafkaWalParser::close() {
134+
if (consumer_) {
135+
consumer_.reset();
136+
}
137+
if (insert_wal_list_ != NULL) {
138+
munmap(insert_wal_list_, insert_wal_list_size_ * sizeof(WalContentUnit));
139+
}
140+
}
141+
142+
uint32_t KafkaWalParser::last_ts() const { return last_ts_; }
143+
144+
const WalContentUnit& KafkaWalParser::get_insert_wal(uint32_t ts) const {
145+
if (insert_wal_list_[ts].ptr == NULL) {
146+
LOG(WARNING) << "No WAL for timestamp " << ts;
147+
}
148+
return insert_wal_list_[ts];
149+
}
150+
const std::vector<UpdateWalUnit>& KafkaWalParser::get_update_wals() const {
151+
return update_wal_list_;
152+
}
153+
154+
} // namespace gs
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/** Copyright 2020 Alibaba Group Holding Limited.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
#ifndef ENGINES_GRAPH_DB_DATABASE_WAL_KAFKA_WAL_PARSER_H_
17+
#define ENGINES_GRAPH_DB_DATABASE_WAL_KAFKA_WAL_PARSER_H_
18+
19+
#include <vector>
20+
#include "flex/engines/graph_db/database/wal/wal.h"
21+
22+
#include "cppkafka/cppkafka.h"
23+
24+
namespace gs {
25+
26+
/*
27+
* Get all partitions of the given topic.
28+
*/
29+
std::vector<cppkafka::TopicPartition> get_all_topic_partitions(
30+
const cppkafka::Configuration& config, const std::string& topic_name);
31+
32+
class KafkaWalParser : public IWalParser {
33+
public:
34+
static constexpr const std::chrono::milliseconds POLL_TIMEOUT =
35+
std::chrono::milliseconds(100);
36+
static constexpr const size_t MAX_BATCH_SIZE = 1000;
37+
38+
static std::unique_ptr<IWalParser> Make();
39+
40+
// always track all partitions and from begining
41+
KafkaWalParser(const cppkafka::Configuration& config);
42+
~KafkaWalParser() { close(); }
43+
44+
void open(const std::string& topic_name) override;
45+
void close() override;
46+
47+
uint32_t last_ts() const override;
48+
const WalContentUnit& get_insert_wal(uint32_t ts) const override;
49+
const std::vector<UpdateWalUnit>& get_update_wals() const override;
50+
51+
//////Kafka specific methods
52+
void open(const std::vector<cppkafka::TopicPartition>& partitions);
53+
54+
private:
55+
std::unique_ptr<cppkafka::Consumer> consumer_;
56+
WalContentUnit* insert_wal_list_;
57+
size_t insert_wal_list_size_;
58+
uint32_t last_ts_;
59+
60+
std::vector<UpdateWalUnit> update_wal_list_;
61+
std::vector<std::string> message_vector_; // used to hold the polled messages
62+
cppkafka::Configuration config_;
63+
};
64+
65+
} // namespace gs
66+
67+
#endif // ENGINES_GRAPH_DB_DATABASE_WAL_KAFKA_WAL_PARSER_H_

0 commit comments

Comments
 (0)