Skip to content

Commit a7e1487

Browse files
committed
fix ci
1 parent cf83036 commit a7e1487

File tree

7 files changed

+128
-54
lines changed

7 files changed

+128
-54
lines changed

.github/workflows/interactive.yml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ jobs:
371371
env:
372372
GS_TEST_DIR: ${{ github.workspace }}/gstest
373373
run: |
374+
cd ${GITHUB_WORKSPACE}/flex/build
374375
wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
375376
tar -zxf kafka_2.13-3.9.0.tgz
376377
cd kafka_2.13-3.9.0
@@ -379,10 +380,12 @@ jobs:
379380
bin/kafka-server-start.sh config/kraft/reconfig-server.properties &
380381
381382
bin/kafka-topics.sh --create --topic kafka-test --bootstrap-server localhost:9092
382-
383-
./tests/hqps/kafka_test localhost:902 kafka-test
384-
385-
pkill -f kafka-server-start.sh
383+
../tests/hqps/kafka_test localhost:9092 kafka-test
384+
bin/kafka-topics.sh --delete --topic kafka-test --bootstrap-server localhost:9092
385+
bin/kafka-topics.sh --create --topic kafka-test --bootstrap-server localhost:9092
386+
../tests/hqps/kafka_wal_ingester_test .. localhost:9092 kafka-test
387+
bin/kafka-topics.sh --delete --topic kafka-test --bootstrap-server localhost:9092
388+
pkill -f kafka
386389
387390
- name: Run Gremlin test on modern graph
388391
env:

flex/engines/graph_db/app/kafka_wal_ingester_app.cc

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,20 +77,18 @@ bool KafkaWalIngesterApp::Query(GraphDBSession& graph, Decoder& input,
7777
Encoder& output) {
7878
// TODO: get value from config
7979
std::string kafka_brokers = std::string(input.get_string());
80-
std::string brokers = std::string(input.get_string());
81-
std::string topic_name = std::string(input.get_string());
8280
std::string group_id = std::string(input.get_string());
83-
std::string engine_endpoint = std::string(input.get_string());
81+
bool enable_auto_commit = input.get_byte();
82+
std::string topic_name = std::string(input.get_string());
8483
LOG(INFO) << "Kafka brokers: " << kafka_brokers;
85-
LOG(INFO) << "engine endpoint: " << engine_endpoint;
8684

8785
cppkafka::Configuration config = {{"metadata.broker.list", kafka_brokers},
8886
{"group.id", group_id},
8987
// Disable auto commit
90-
{"enable.auto.commit", false}};
88+
{"enable.auto.commit", enable_auto_commit}};
9189
gs::KafkaWalConsumer consumer(config, topic_name, 1);
9290
// TODO: how to make it stop
93-
while (!graph.db().kafka_wal_ingester_state()) {
91+
while (graph.db().kafka_wal_ingester_state()) {
9492
auto res = consumer.poll();
9593
if (res.empty()) {
9694
std::this_thread::sleep_for(gs::KafkaWalConsumer::POLL_TIMEOUT);
@@ -103,11 +101,13 @@ bool KafkaWalIngesterApp::Query(GraphDBSession& graph, Decoder& input,
103101
txn.IngestWal(graph.graph(), txn.timestamp(),
104102
const_cast<char*>(res.data()) + sizeof(WalHeader),
105103
header->length, txn.allocator());
104+
txn.Commit();
106105
} else if (header->type == 1) {
107106
auto txn = graph.GetUpdateTransaction();
108107
txn.IngestWal(graph.graph(), graph.db().work_dir(), txn.timestamp(),
109108
const_cast<char*>(res.data()) + sizeof(WalHeader),
110109
header->length, txn.allocator());
110+
txn.Commit();
111111
} else {
112112
LOG(ERROR) << "Unknown wal type: " << header->type;
113113
}

flex/engines/graph_db/database/graph_db.cc

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -283,24 +283,25 @@ bool GraphDB::kafka_wal_ingester_state() const {
283283
return kafka_wal_ingester_thread_running_.load();
284284
}
285285

286-
void GraphDB::start_kafka_wal_ingester(const std::string& kafka_brokers,
287-
const std::string& brokers,
288-
const std::string& topic_name,
289-
const std::string& group_id,
290-
const std::string& engine_endpoint) {
286+
void GraphDB::start_kafka_wal_ingester(const cppkafka::Configuration& config,
287+
const std::string& topic_name) {
291288
if (kafka_wal_ingester_thread_running_) {
292289
kafka_wal_ingester_thread_running_ = false;
293-
kafka_wal_ingester_thread_.join();
290+
if (kafka_wal_ingester_thread_.joinable()) {
291+
kafka_wal_ingester_thread_.join();
292+
}
294293
}
295-
std::vector<char> buffer;
296-
gs::Encoder encoder(buffer);
297-
encoder.put_string(kafka_brokers);
298-
encoder.put_string(brokers);
299-
encoder.put_string(topic_name);
300-
encoder.put_string(group_id);
301-
encoder.put_string(engine_endpoint);
302-
gs::Decoder decoder(buffer.data(), buffer.size());
303-
KafkaWalIngesterApp().Query(GetSession(0), decoder, encoder);
294+
kafka_wal_ingester_thread_running_ = true;
295+
kafka_wal_ingester_thread_ = std::thread([&]() {
296+
std::vector<char> buffer;
297+
gs::Encoder encoder(buffer);
298+
encoder.put_string(config.get("metadata.broker.list"));
299+
encoder.put_string(config.get("group.id"));
300+
encoder.put_byte(config.get("enable.auto.commit") == "true");
301+
encoder.put_string(topic_name);
302+
gs::Decoder decoder(buffer.data(), buffer.size());
303+
KafkaWalIngesterApp().Query(GetSession(0), decoder, encoder);
304+
});
304305
}
305306

306307
void GraphDB::stop_kafka_wal_ingester() {

flex/engines/graph_db/database/graph_db.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
#include "flex/storages/rt_mutable_graph/loader/loader_factory.h"
3434
#include "flex/storages/rt_mutable_graph/loading_config.h"
3535
#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h"
36+
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
37+
#include "cppkafka/cppkafka.h"
38+
#endif
3639

3740
namespace gs {
3841

@@ -172,11 +175,8 @@ class GraphDB {
172175
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
173176
bool kafka_wal_ingester_state() const;
174177

175-
void start_kafka_wal_ingester(const std::string& kafka_brokers,
176-
const std::string& brokers,
177-
const std::string& topic_name,
178-
const std::string& group_id,
179-
const std::string& engine_endpoint);
178+
void start_kafka_wal_ingester(const cppkafka::Configuration& config,
179+
const std::string& topic_name);
180180

181181
void stop_kafka_wal_ingester();
182182
#endif

flex/engines/graph_db/database/wal/kafka_wal_parser.cc

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@
1414
*/
1515

1616
#include "flex/engines/graph_db/database/wal/kafka_wal_parser.h"
17-
#include <fcntl.h>
18-
#include <sys/mman.h>
19-
#include <unistd.h>
20-
#include <filesystem>
2117
#include "flex/engines/graph_db/database/wal/wal.h"
2218

2319
namespace gs {
@@ -40,7 +36,7 @@ std::vector<cppkafka::TopicPartition> get_all_topic_partitions(
4036
return partitions;
4137
}
4238

43-
std::unique_ptr<IWalParser> KafkaWalParser::Make() {
39+
std::unique_ptr<IWalParser> KafkaWalParser::Make(const std::string&) {
4440
const char* broker_list = std::getenv("KAFKA_BROKER_LIST");
4541
if (broker_list == nullptr) {
4642
LOG(FATAL) << "KAFKA_BROKER_LIST is not set";
@@ -53,11 +49,7 @@ std::unique_ptr<IWalParser> KafkaWalParser::Make() {
5349
}
5450

5551
KafkaWalParser::KafkaWalParser(const cppkafka::Configuration& config)
56-
: consumer_(nullptr),
57-
insert_wal_list_(NULL),
58-
insert_wal_list_size_(0),
59-
last_ts_(0),
60-
config_(config) {
52+
: consumer_(nullptr), last_ts_(0), config_(config) {
6153
consumer_ = std::make_unique<cppkafka::Consumer>(config);
6254
}
6355

@@ -69,11 +61,7 @@ void KafkaWalParser::open(const std::string& topic_name) {
6961
void KafkaWalParser::open(
7062
const std::vector<cppkafka::TopicPartition>& topic_partitions) {
7163
consumer_->assign(topic_partitions);
72-
insert_wal_list_ = static_cast<WalContentUnit*>(
73-
mmap(NULL, IWalWriter::MAX_WALS_NUM * sizeof(WalContentUnit),
74-
PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_NORESERVE,
75-
-1, 0));
76-
insert_wal_list_size_ = IWalWriter::MAX_WALS_NUM;
64+
insert_wal_list_.resize(4096);
7765

7866
while (true) {
7967
auto msgs = consumer_->poll_batch(MAX_BATCH_SIZE);
@@ -95,7 +83,7 @@ void KafkaWalParser::open(
9583
}
9684

9785
for (auto& wal : message_vector_) {
98-
LOG(INFO) << "Got wal:" << wal.size();
86+
VLOG(1) << "Got wal:" << wal.size();
9987
const char* payload = wal.data();
10088
const WalHeader* header = reinterpret_cast<const WalHeader*>(payload);
10189
uint32_t cur_ts = header->timestamp;
@@ -111,6 +99,9 @@ void KafkaWalParser::open(
11199
unit.size = length;
112100
update_wal_list_.push_back(unit);
113101
} else {
102+
if (cur_ts >= insert_wal_list_.size()) {
103+
insert_wal_list_.resize(cur_ts + 1);
104+
}
114105
if (insert_wal_list_[cur_ts].ptr) {
115106
LOG(WARNING) << "Duplicated timestamp " << cur_ts << ", skip";
116107
}
@@ -134,9 +125,7 @@ void KafkaWalParser::close() {
134125
if (consumer_) {
135126
consumer_.reset();
136127
}
137-
if (insert_wal_list_ != NULL) {
138-
munmap(insert_wal_list_, insert_wal_list_size_ * sizeof(WalContentUnit));
139-
}
128+
insert_wal_list_.clear();
140129
}
141130

142131
uint32_t KafkaWalParser::last_ts() const { return last_ts_; }
@@ -151,4 +140,8 @@ const std::vector<UpdateWalUnit>& KafkaWalParser::get_update_wals() const {
151140
return update_wal_list_;
152141
}
153142

143+
const bool KafkaWalParser::registered_ = WalParserFactory::RegisterWalParser(
144+
"kafaka", static_cast<WalParserFactory::wal_parser_initializer_t>(
145+
&KafkaWalParser::Make));
146+
154147
} // namespace gs

flex/engines/graph_db/database/wal/kafka_wal_parser.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
#define ENGINES_GRAPH_DB_DATABASE_WAL_KAFKA_WAL_PARSER_H_
1818

1919
#include <vector>
20-
#include "flex/engines/graph_db/database/wal/wal.h"
21-
2220
#include "cppkafka/cppkafka.h"
21+
#include "flex/engines/graph_db/database/wal/wal.h"
2322

2423
namespace gs {
2524

@@ -35,7 +34,7 @@ class KafkaWalParser : public IWalParser {
3534
std::chrono::milliseconds(100);
3635
static constexpr const size_t MAX_BATCH_SIZE = 1000;
3736

38-
static std::unique_ptr<IWalParser> Make();
37+
static std::unique_ptr<IWalParser> Make(const std::string&);
3938

4039
// always track all partitions and from begining
4140
KafkaWalParser(const cppkafka::Configuration& config);
@@ -53,13 +52,14 @@ class KafkaWalParser : public IWalParser {
5352

5453
private:
5554
std::unique_ptr<cppkafka::Consumer> consumer_;
56-
WalContentUnit* insert_wal_list_;
57-
size_t insert_wal_list_size_;
55+
std::vector<WalContentUnit> insert_wal_list_;
5856
uint32_t last_ts_;
5957

6058
std::vector<UpdateWalUnit> update_wal_list_;
6159
std::vector<std::string> message_vector_; // used to hold the polled messages
6260
cppkafka::Configuration config_;
61+
62+
static const bool registered_;
6363
};
6464

6565
} // namespace gs
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/** Copyright 2020 Alibaba Group Holding Limited.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
#include <iostream>
16+
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
17+
#include <flex/engines/graph_db/database/wal/kafka_wal_parser.h>
18+
#include <flex/engines/graph_db/database/wal/kafka_wal_writer.h>
19+
#endif
20+
#include "grape/serialization/in_archive.h"
21+
#include "grape/serialization/out_archive.h"
22+
23+
#include "flex/engines/graph_db/database/graph_db.h"
24+
#include "flex/engines/graph_db/database/graph_db_session.h"
25+
#include "flex/storages/rt_mutable_graph/schema.h"
26+
27+
int main(int argc, char** argv) {
28+
#ifdef BUILD_KAFKA_WAL_WRITER_PARSER
29+
gs::Schema schema;
30+
schema.add_vertex_label(
31+
"PERSON",
32+
{
33+
gs::PropertyType::kInt64, // version
34+
},
35+
{"weight"},
36+
{std::tuple<gs::PropertyType, std::string, size_t>(
37+
gs::PropertyType::kInt64, "id", 0)},
38+
{gs::StorageStrategy::kMem, gs::StorageStrategy::kMem}, 4096);
39+
gs::GraphDB db;
40+
std::string work_dir = argv[1];
41+
std::string kafka_brokers = argv[2];
42+
std::string kafka_topic = argv[3];
43+
db.Open(schema, work_dir, 1);
44+
45+
gs::KafkaWalWriter writer(kafka_brokers);
46+
writer.open(kafka_topic, 0);
47+
grape::InArchive in_archive;
48+
in_archive.Resize(sizeof(gs::WalHeader));
49+
gs::label_t label = db.schema().get_vertex_label_id("PERSON");
50+
in_archive << static_cast<uint8_t>(0) << label;
51+
int64_t id = 998244353;
52+
in_archive << id;
53+
int64_t weight = 100;
54+
in_archive << weight;
55+
auto header = reinterpret_cast<gs::WalHeader*>(in_archive.GetBuffer());
56+
header->timestamp = 1;
57+
header->type = 0;
58+
header->length = in_archive.GetSize() - sizeof(gs::WalHeader);
59+
writer.append(in_archive.GetBuffer(), in_archive.GetSize());
60+
writer.close();
61+
cppkafka::Configuration config = {{"metadata.broker.list", kafka_brokers},
62+
{"group.id", "test"},
63+
{"enable.auto.commit", false}};
64+
db.start_kafka_wal_ingester(config, kafka_topic);
65+
std::this_thread::sleep_for(std::chrono::seconds(1));
66+
db.stop_kafka_wal_ingester();
67+
auto txn = db.GetReadTransaction(0);
68+
gs::vid_t lid;
69+
CHECK(txn.GetVertexNum(label) == 1);
70+
CHECK(txn.GetVertexIndex(label, id, lid));
71+
auto iter = txn.GetVertexIterator(label);
72+
CHECK(iter.GetField(0).AsInt64() == 100);
73+
std::cout << "Vertex id: " << lid << std::endl;
74+
db.Close();
75+
#endif
76+
return 0;
77+
}

0 commit comments

Comments
 (0)