Skip to content

Commit 6f18e38

Browse files
committed
refactor: improve worker service management and time handling
- Add time offset accessor method to support timestamp synchronization - Refactor worker service lifecycle management with better error handling - Extract service ID allocation logic into dedicated method for clarity - Optimize worker message processing with improved queue handling - Enhance service scanning with pre-allocated memory and proper JSON formatting - Improve code quality with const-correctness and structured bindings Key improvements: - Replace memory_order_release with memory_order_relaxed for better performance - Add early return for empty service collections to avoid unnecessary processing - Use structured bindings for cleaner iteration over service containers - Separate service ID allocation logic for better maintainability - Add comprehensive error handling for service creation failures Performance optimizations: - Pre-allocate string capacity for service scan responses - Optimize service removal with streamlined state management - Improve message queue processing with cached service references
1 parent ea6f461 commit 6f18e38

File tree

3 files changed

+169
-119
lines changed

3 files changed

+169
-119
lines changed

src/common/time.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ class time {
2929
return true;
3030
}
3131

32+
static std::time_t offset() {
33+
return offset_;
34+
}
35+
3236
static std::time_t now() {
3337
auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(
3438
std::chrono::steady_clock::now() - start_time_point_

src/moon/core/worker.cpp

Lines changed: 162 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ void worker::run() {
3737

3838
void worker::stop() {
3939
asio::post(io_ctx_, [this] {
40-
message m = message { PTYPE_SHUTDOWN, 0, 0, 0 };
41-
for (auto& it: services_) {
42-
it.second->dispatch(&m);
40+
auto m = message { PTYPE_SHUTDOWN, 0, 0, 0 };
41+
for (const auto&[k,v]: services_) {
42+
v->dispatch(&m);
4343
}
4444
});
4545
}
@@ -51,146 +51,185 @@ void worker::wait() {
5151
}
5252
}
5353

54-
void worker::signal(int val) {
54+
void worker::signal(int val) const{
5555
if (auto s = current_.load(std::memory_order_acquire); s != nullptr) {
5656
s->signal(val);
5757
}
5858
}
5959

60-
void worker::new_service(std::unique_ptr<service_conf> conf) {
61-
count_.fetch_add(1, std::memory_order_release);
62-
asio::post(io_ctx_, [this, conf = std::move(conf)]() {
63-
do {
64-
size_t counter = 0;
65-
uint32_t serviceid = conf->opt_service_id;
66-
if (serviceid == 0) {
67-
do {
68-
if (counter >= WORKER_MAX_SERVICE) {
69-
serviceid = 0;
70-
CONSOLE_ERROR(
71-
"new service failed: can not get more service id. worker[%u] service num[%zu].",
72-
id(),
73-
services_.size()
74-
);
75-
break;
76-
}
60+
uint32_t worker::allocate_service_id(uint32_t opt_service_id) {
61+
// Use specified service ID if provided
62+
if (opt_service_id != 0) {
63+
if (services_.find(opt_service_id) != services_.end()) {
64+
CONSOLE_ERROR(
65+
"new service failed: serviceid[%08X] already exists, worker[%u] service num[%zu].",
66+
opt_service_id,
67+
id(),
68+
services_.size()
69+
);
70+
return 0;
71+
}
72+
return opt_service_id;
73+
}
7774

78-
++nextid_;
79-
if (nextid_ == WORKER_MAX_SERVICE) {
80-
nextid_ = 1;
81-
}
82-
serviceid = nextid_ | (id() << WORKER_ID_SHIFT);
83-
++counter;
84-
} while (services_.find(serviceid) != services_.end());
75+
// Auto-allocate service ID
76+
size_t counter = 0;
77+
uint32_t serviceid = 0;
78+
79+
do {
80+
if (counter >= WORKER_MAX_SERVICE) {
81+
CONSOLE_ERROR(
82+
"new service failed: can not get more service id. worker[%u] service num[%zu].",
83+
id(),
84+
services_.size()
85+
);
86+
return 0;
87+
}
8588

86-
if (serviceid == 0) {
87-
break;
88-
}
89-
}else{
90-
if (services_.find(serviceid) != services_.end()) {
91-
serviceid = 0;
92-
CONSOLE_ERROR(
93-
"new service failed: serviceid[%08X] already exists, worker[%u] service num[%zu].",
94-
serviceid,
95-
id(),
96-
services_.size()
97-
);
98-
break;
99-
}
100-
}
89+
++nextid_;
90+
if (nextid_ == WORKER_MAX_SERVICE) {
91+
nextid_ = 1;
92+
}
93+
serviceid = nextid_ | (id() << WORKER_ID_SHIFT);
94+
++counter;
95+
} while (services_.find(serviceid) != services_.end());
10196

102-
auto s = server_->make_service(conf->type);
103-
MOON_ASSERT(
104-
s,
105-
moon::format(
106-
"new service failed:service type[%s] was not registered",
107-
conf->type.data()
108-
)
109-
.data()
110-
);
111-
s->set_id(serviceid);
112-
s->set_unique(conf->unique);
113-
s->set_server_context(server_, this);
97+
return serviceid;
98+
}
11499

115-
if (!s->init(*conf)) {
116-
if (serviceid == BOOTSTRAP_ADDR) {
117-
server_->stop(-1);
118-
}
119-
break;
100+
void worker::new_service(std::unique_ptr<service_conf> conf) {
101+
count_.fetch_add(1, std::memory_order_relaxed);
102+
asio::post(io_ctx_, [this, conf = std::move(conf)]() {
103+
// Allocate service ID
104+
uint32_t serviceid = allocate_service_id(conf->opt_service_id);
105+
if (serviceid == 0) {
106+
count_.fetch_sub(1, std::memory_order_relaxed);
107+
if (services_.empty()) {
108+
shared(true);
109+
}
110+
if (conf->session != 0) {
111+
server_->send_message(message { PTYPE_INTEGER, 0, conf->creator, conf->session, 0 });
120112
}
121-
s->ok(true);
122-
services_.emplace(serviceid, std::move(s));
113+
return;
114+
}
123115

124-
if (0 != conf->session) {
125-
server_->send_message(
126-
message { PTYPE_INTEGER, 0, conf->creator, conf->session, serviceid }
127-
);
116+
// Create service instance
117+
auto s = server_->make_service(conf->type);
118+
if (!s) {
119+
CONSOLE_ERROR(
120+
"new service failed: service type[%s] was not registered",
121+
conf->type.data()
122+
);
123+
count_.fetch_sub(1, std::memory_order_relaxed);
124+
if (services_.empty()) {
125+
shared(true);
126+
}
127+
if (conf->session != 0) {
128+
server_->send_message(message { PTYPE_INTEGER, 0, conf->creator, conf->session, 0 });
128129
}
129130
return;
130-
} while (false);
131+
}
131132

132-
count_.fetch_sub(1, std::memory_order_release);
133-
if (services_.empty()) {
134-
shared(true);
133+
// Initialize service
134+
s->set_id(serviceid);
135+
s->set_unique(conf->unique);
136+
s->set_server_context(server_, this);
137+
138+
if (!s->init(*conf)) {
139+
if (serviceid == BOOTSTRAP_ADDR) {
140+
server_->stop(-1);
141+
}
142+
count_.fetch_sub(1, std::memory_order_relaxed);
143+
if (services_.empty()) {
144+
shared(true);
145+
}
146+
if (conf->session != 0) {
147+
server_->send_message(message { PTYPE_INTEGER, 0, conf->creator, conf->session, 0 });
148+
}
149+
return;
135150
}
136151

137-
if (0 != conf->session) {
138-
server_->send_message(message { PTYPE_INTEGER, 0, conf->creator, conf->session, 0 });
152+
// Service created successfully
153+
s->ok(true);
154+
services_.emplace(serviceid, std::move(s));
155+
156+
if (conf->session != 0) {
157+
server_->send_message(
158+
message { PTYPE_INTEGER, 0, conf->creator, conf->session, serviceid }
159+
);
139160
}
140161
});
141162
}
142163

143164
void worker::remove_service(uint32_t serviceid, uint32_t sender, int64_t sessionid) {
144165
asio::post(io_ctx_, [this, serviceid, sender, sessionid]() {
145-
if (auto s = find_service(serviceid); nullptr != s) {
146-
auto name = s->name();
147-
auto id = s->id();
148-
count_.fetch_sub(1, std::memory_order_release);
149-
server_->response(sender, "service destroy"sv, sessionid);
150-
services_.erase(serviceid);
151-
if (services_.empty())
152-
shared(true);
153-
154-
if (server_->get_state() == state::ready) {
155-
auto content =
156-
moon::format("_service_exit,name:%s serviceid:%08X", name.data(), id);
157-
auto buf = buffer { content.size() };
158-
buf.write_back(content);
159-
server_->broadcast(serviceid, buf, PTYPE_SYSTEM);
160-
}
161-
162-
if (serviceid == BOOTSTRAP_ADDR) {
163-
server_->set_state(state::stopping);
164-
}
165-
} else {
166+
auto s = find_service(serviceid);
167+
if (nullptr == s) {
166168
server_->response(
167169
sender,
168170
moon::format("worker::remove_service [%08X] not found", serviceid),
169171
sessionid,
170172
PTYPE_ERROR
171173
);
174+
return;
175+
}
176+
177+
auto name = s->name();
178+
auto id = s->id();
179+
count_.fetch_sub(1, std::memory_order_relaxed);
180+
services_.erase(serviceid);
181+
182+
// Mark worker as shared if no services remain
183+
if (services_.empty()) {
184+
shared(true);
185+
}
186+
187+
server_->response(sender, "service destroy"sv, sessionid);
188+
189+
// Broadcast service exit notification (only if server is ready)
190+
if (server_->get_state() == state::ready) {
191+
auto content = moon::format("_service_exit,name:%s serviceid:%08X", name.data(), id);
192+
auto buf = buffer { content.size() };
193+
buf.write_back(content);
194+
server_->broadcast(serviceid, buf, PTYPE_SYSTEM);
195+
}
196+
197+
// Check if bootstrap service was removed
198+
if (serviceid == BOOTSTRAP_ADDR) {
199+
server_->set_state(state::stopping);
172200
}
173201
});
174202
}
175203

176204
void worker::scan(uint32_t sender, int64_t sessionid) {
177205
asio::post(io_ctx_, [this, sender, sessionid] {
206+
if (services_.empty()) {
207+
server_->response(sender, std::string_view{}, sessionid);
208+
return;
209+
}
210+
178211
std::string content;
179-
for (const auto& it: services_) {
180-
if (content.empty())
181-
content.append("[");
212+
// Pre-allocate approximate space: each entry ~50 chars
213+
content.reserve(services_.size() * 50 + 2);
214+
content.append("[");
215+
216+
bool first = true;
217+
for (const auto& [k,v]: services_) {
218+
if (!first) {
219+
content.append(",");
220+
}
221+
first = false;
182222

183223
content.append(
184224
moon::format(
185-
R"({"name":"%s","serviceid":"%X"},)",
186-
it.second->name().data(),
187-
it.second->id()
225+
R"({"name":"%s","serviceid":"%X"})",
226+
v->name().data(),
227+
v->id()
188228
)
189229
);
190230
}
191231

192-
if (!content.empty())
193-
content.back() = ']';
232+
content.append("]");
194233
server_->response(sender, content, sessionid);
195234
});
196235
}
@@ -202,17 +241,23 @@ asio::io_context& worker::io_context() {
202241
void worker::send(message&& msg) {
203242
if (mq_.push_back(std::move(msg)) == 1) {
204243
asio::post(io_ctx_, [this]() {
205-
if (auto& read_queue = mq_.swap_on_read(); !read_queue.empty()) {
206-
auto size = read_queue.size();
207-
swapped_size_.store(size, std::memory_order_relaxed);
208-
service* s = nullptr;
209-
for (auto& m: read_queue) {
210-
s = handle_one(s, std::move(m));
211-
swapped_size_.store(--size, std::memory_order_relaxed);
212-
}
213-
read_queue.clear();
214-
current_.store(nullptr, std::memory_order_relaxed);
244+
auto& read_queue = mq_.swap_on_read();
245+
if (read_queue.empty()) {
246+
return;
247+
}
248+
249+
auto size = read_queue.size();
250+
swapped_size_.store(size, std::memory_order_relaxed);
251+
252+
// Process all messages in the queue
253+
service* cached_service = nullptr;
254+
for (auto& m: read_queue) {
255+
cached_service = handle_one(cached_service, std::move(m));
256+
swapped_size_.store(--size, std::memory_order_relaxed);
215257
}
258+
259+
read_queue.clear();
260+
current_.store(nullptr, std::memory_order_relaxed);
216261
});
217262
}
218263
}
@@ -222,8 +267,7 @@ uint32_t worker::id() const {
222267
}
223268

224269
service* worker::find_service(uint32_t serviceid) const {
225-
auto iter = services_.find(serviceid);
226-
if (services_.end() != iter) {
270+
if (auto iter = services_.find(serviceid); services_.end() != iter) {
227271
return iter->second.get();
228272
}
229273
return nullptr;
@@ -287,14 +331,14 @@ service* worker::handle_one(service* s, message&& msg) {
287331
return s->ok() ? s : nullptr;
288332
}
289333

290-
for (auto& it: services_) {
291-
if (!it.second->unique() && type == PTYPE_SYSTEM) {
334+
for (const auto& [k,v]: services_) {
335+
if (!v->unique() && type == PTYPE_SYSTEM) {
292336
continue;
293337
}
294338

295-
if (it.second->ok() && it.second->id() != sender) {
296-
current_.store(it.second.get(), std::memory_order_release);
297-
handle_message(it.second, msg);
339+
if (v->ok() && v->id() != sender) {
340+
current_.store(v.get(), std::memory_order_release);
341+
handle_message(v, msg);
298342
}
299343
}
300344
return nullptr;

src/moon/core/worker.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,15 @@ class worker {
6262

6363
void wait();
6464

65-
void signal(int val);
65+
void signal(int val) const;
6666

6767
private:
6868
service* handle_one(service* s, message&& msg);
6969

7070
service* find_service(uint32_t serviceid) const;
7171

72+
uint32_t allocate_service_id(uint32_t opt_service_id);
73+
7274
private:
7375
std::atomic_bool shared_ = true;
7476
std::atomic_uint32_t count_ = 0;

0 commit comments

Comments
 (0)