Skip to content

Commit bbb8149

Browse files
committed
v0.9.5
2 parents ccdf83b + 0fbe68a commit bbb8149

25 files changed

+496
-51
lines changed

.gitignore

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
bin/
77

8-
build/
8+
build*
9+
10+
output*
911

1012
*.user
1113

@@ -17,5 +19,3 @@ myframe/export.h
1719
myframe/config.h
1820
3rd/pkg/
1921
3rd/src/
20-
3rd/build/
21-
output/

3rd/CMakeLists.txt

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ ExternalProject_Add(
2323
-DBUILD_SHARED_LIBS=ON
2424
-DCMAKE_BUILD_TYPE=Release
2525
-DCMAKE_INSTALL_PREFIX=${CMAKE_INSTALL_PREFIX}
26+
-DCMAKE_TOOLCHAIN_FILE=${CMAKE_TOOLCHAIN_FILE}
2627
)
2728

2829
ExternalProject_Add(
@@ -42,6 +43,7 @@ ExternalProject_Add(
4243
-DWITH_GTEST=OFF
4344
-DCMAKE_INSTALL_PREFIX=${CMAKE_INSTALL_PREFIX}
4445
-DCMAKE_PREFIX_PATH=${CMAKE_INSTALL_PREFIX}
46+
-DCMAKE_TOOLCHAIN_FILE=${CMAKE_TOOLCHAIN_FILE}
4547
)
4648
ExternalProject_Add_StepDependencies(glog install gflags)
4749

@@ -64,4 +66,5 @@ ExternalProject_Add(
6466
-DJSONCPP_WITH_POST_BUILD_UNITTEST=OFF
6567
-DJSONCPP_WITH_PKGCONFIG_SUPPORT=OFF
6668
-DCMAKE_INSTALL_PREFIX=${CMAKE_INSTALL_PREFIX}
69+
-DCMAKE_TOOLCHAIN_FILE=${CMAKE_TOOLCHAIN_FILE}
6770
)

CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.10)
22
if (POLICY CMP0091)
33
cmake_policy(SET CMP0091 NEW)
44
endif()
5-
project(myframe VERSION 0.9.4)
5+
project(myframe VERSION 0.9.5)
66

77
### option
88
option(MYFRAME_USE_CV "Using conditional variables for thread communication" ON)

cpplint.bash

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ function main() {
1919
-name "*.cxx" -or \
2020
-name "*.cuh" \
2121
')' -and -not -path "./build/*" \
22+
-and -not -path "./output/*" \
23+
-and -not -path "./3rd/*" \
24+
-and -not -path "./myframe/export.h" \
2225
| xargs python3 ./cpplint.py
2326
}
2427

examples/CMakeLists.txt

+9-6
Original file line numberDiff line numberDiff line change
@@ -34,26 +34,29 @@ add_library(example_trans_obj SHARED example_trans_obj.cpp)
3434
target_link_libraries(example_trans_obj ${PROJECT_NAME})
3535

3636
### install
37-
FILE(GLOB conf_files "*.json")
38-
INSTALL(FILES
37+
file(GLOB conf_files "*.json")
38+
# 在unix like系统下禁用MYFRAME_USE_CV才能用,有需要再打开
39+
list(REMOVE_ITEM conf_files "${CMAKE_CURRENT_SOURCE_DIR}/example_worker_interactive_with_3rd_frame.json")
40+
install(FILES
3941
${conf_files}
4042
PERMISSIONS
4143
OWNER_READ OWNER_WRITE
4244
GROUP_READ
4345
WORLD_READ
4446
DESTINATION ${MYFRAME_SERVICE_DIR}
4547
)
46-
INSTALL(TARGETS
48+
install(TARGETS
4749
example_actor_helloworld
4850
example_actor_timer
4951
example_actor_serial
5052
example_actor_concurrent
5153
example_actor_subscribe
5254
example_node
53-
example_worker_actor_interactive
55+
example_worker_actor_interactive
5456
example_worker_publish
55-
example_worker_talk
56-
example_worker_interactive_with_3rd_frame
57+
example_worker_talk
58+
# 在unix like系统下禁用MYFRAME_USE_CV才能用,有需要再打开
59+
# example_worker_interactive_with_3rd_frame
5760
example_worker_quit
5861
example_config
5962
example_trans_obj

examples/example_config.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ class ExampleActorConfig : public myframe::Actor {
1717
int Init(const char* param) override {
1818
(void)param;
1919
auto conf = GetConfig();
20+
LOG(INFO) << GetActorName() << " pending queue size "
21+
<< GetMailbox()->GetPendingQueueSize();
22+
LOG(INFO) << GetActorName() << " run queue size "
23+
<< GetMailbox()->GetRunQueueSize();
2024
LOG(INFO) << GetActorName() << " conf " << conf->toStyledString();
2125
return 0;
2226
}

examples/example_config.json

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
{
77
"instance_name":"#1",
88
"instance_config":{
9+
"pending_queue_size":-1,
10+
"run_queue_size":-1,
911
"key1":"hello",
1012
"key2":"world"
1113
}

launcher/conf/sys.json

+2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
{
2+
"default_pending_queue_size":-1,
3+
"default_run_queue_size":-1,
24
"thread_poll_size":4,
35
"conn_event_size":2,
46
"warning_msg_size":10

launcher/launcher.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ int main(int argc, char** argv) {
7777
lib_dir.string(),
7878
module_args.GetThreadPoolSize(),
7979
module_args.GetConnEventSize(),
80-
module_args.GetWarningMsgSize())) {
80+
module_args.GetWarningMsgSize(),
81+
module_args.GetDefaultPendingQueueSize(),
82+
module_args.GetDefaultRunQueueSize())) {
8183
LOG(ERROR) << "Init failed";
8284
return -1;
8385
}

launcher/module_argument.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,14 @@ bool ModuleArgument::ParseSysConf(const std::string& sys_conf) {
131131
&& root["service_dir"].isString()) {
132132
conf_dir_ = root["service_dir"].asString();
133133
}
134+
if (root.isMember("default_pending_queue_size")
135+
&& root["default_pending_queue_size"].isInt()) {
136+
default_pending_queue_size_ = root["default_pending_queue_size"].asInt();
137+
}
138+
if (root.isMember("default_run_queue_size")
139+
&& root["default_run_queue_size"].isInt()) {
140+
default_run_queue_size_ = root["default_run_queue_size"].asInt();
141+
}
134142
return true;
135143
}
136144

launcher/module_argument.h

+8
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,21 @@ class ModuleArgument final {
2828
inline int GetThreadPoolSize() const { return thread_poll_size_; }
2929
inline int GetConnEventSize() const { return conn_event_size_; }
3030
inline int GetWarningMsgSize() const { return warning_msg_size_; }
31+
inline int GetDefaultPendingQueueSize() const {
32+
return default_pending_queue_size_;
33+
}
34+
inline int GetDefaultRunQueueSize() const {
35+
return default_run_queue_size_;
36+
}
3137

3238
private:
3339
bool ParseSysConf(const std::string&);
3440

3541
int thread_poll_size_{4};
3642
int conn_event_size_{2};
3743
int warning_msg_size_{10};
44+
int default_pending_queue_size_{-1};
45+
int default_run_queue_size_{-1};
3846
std::string log_dir_;
3947
std::string lib_dir_;
4048
std::string conf_dir_;

myframe/actor_context.cpp

+11
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ ActorContext::ActorContext(
2525
, app_(app) {
2626
actor_->SetContext(this);
2727
mailbox_.SetAddr(actor_->GetActorName());
28+
int pending_queue_size = app->GetDefaultPendingQueueSize();
29+
int run_queue_size = app->GetDefaultRunQueueSize();
30+
auto cfg = actor_->GetConfig();
31+
if (cfg->isMember("pending_queue_size")) {
32+
pending_queue_size = cfg->get("pending_queue_size", -1).asInt();
33+
}
34+
if (cfg->isMember("run_queue_size")) {
35+
run_queue_size = cfg->get("run_queue_size", -1).asInt();
36+
}
37+
mailbox_.SetPendingQueueSize(pending_queue_size);
38+
mailbox_.SetRunQueueSize(run_queue_size);
2839
LOG(INFO) << mailbox_.Addr() << " context create";
2940
}
3041

myframe/actor_context_manager.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,12 @@ class ActorContextManager final {
3939
std::vector<std::string> GetAllActorAddr();
4040
bool HasActor(const std::string& name);
4141

42+
/* 将有消息的actor放入链表 */
43+
void PushContext(std::shared_ptr<ActorContext> ctx);
44+
4245
private:
4346
/* 获得actor名对应的actor */
4447
std::shared_ptr<ActorContext> GetContext(const std::string& actor_name);
45-
/* 将有消息的actor放入链表 */
46-
void PushContext(std::shared_ptr<ActorContext> ctx);
4748
void PrintWaitQueue();
4849

4950
/// 当前注册actor数量

myframe/app.cpp

+21-5
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ std::shared_ptr<WorkerTimer> App::GetTimerWorker() {
3434
LOG(ERROR) << "worker context manager is nullptr";
3535
return nullptr;
3636
}
37-
std::string worker_timer_name = "worker.timer.#1";
37+
std::string worker_timer_name = "worker.T.1";
3838
auto w = ev_mgr_->Get<WorkerContext>(worker_timer_name);
3939
if (w == nullptr) {
4040
LOG(ERROR)
@@ -63,14 +63,18 @@ bool App::Init(
6363
const std::string& lib_dir,
6464
int thread_pool_size,
6565
int event_conn_size,
66-
int warning_msg_size) {
66+
int warning_msg_size,
67+
int default_pending_queue_size,
68+
int default_run_queue_size) {
6769
if (state_.load() != kUninitialized) {
6870
return true;
6971
}
7072

7173
bool ret = true;
7274
lib_dir_ = lib_dir;
7375
warning_msg_size_.store(warning_msg_size);
76+
default_pending_queue_size_ = default_pending_queue_size;
77+
default_run_queue_size_ = default_run_queue_size;
7478
ret &= poller_->Init();
7579
ret &= worker_ctx_mgr_->Init(warning_msg_size);
7680
ret &= ev_conn_mgr_->Init(event_conn_size);
@@ -437,7 +441,7 @@ bool App::StartCommonWorker(int worker_count) {
437441
for (int i = 0; i < worker_count; ++i) {
438442
auto worker = std::make_shared<WorkerCommon>();
439443
worker->SetModName("class");
440-
worker->SetTypeName("WorkerCommon");
444+
worker->SetTypeName("C");
441445
if (!AddWorker(std::to_string(i), worker)) {
442446
LOG(ERROR) << "start common worker " << i << " failed";
443447
continue;
@@ -451,8 +455,8 @@ bool App::StartCommonWorker(int worker_count) {
451455
bool App::StartTimerWorker() {
452456
auto worker = std::make_shared<WorkerTimer>();
453457
worker->SetModName("class");
454-
worker->SetTypeName("timer");
455-
if (!AddWorker("#1", worker)) {
458+
worker->SetTypeName("T");
459+
if (!AddWorker("1", worker)) {
456460
LOG(ERROR) << "start timer worker failed";
457461
return false;
458462
}
@@ -557,6 +561,10 @@ void App::CheckStopWorkers() {
557561
worker_ctx_mgr_->PopFrontIdleWorker();
558562
auto common_idle_worker = worker_ctx->GetWorker<WorkerCommon>();
559563
common_idle_worker->SetActorContext(actor_ctx);
564+
// 接收队列不空,重新加入等待执行队列
565+
if (!actor_mailbox->RecvEmpty()) {
566+
actor_ctx_mgr_->PushContext(std::move(actor_ctx));
567+
}
560568
worker_ctx->GetCmdChannel()->SendToOwner(CmdChannel::Cmd::kRun);
561569
} else {
562570
LOG(ERROR) << actor_ctx->GetActor()->GetActorName() << " has no msg";
@@ -816,4 +824,12 @@ std::string App::GetLibName(const std::string& name) {
816824
#endif
817825
}
818826

827+
int App::GetDefaultPendingQueueSize() const {
828+
return default_pending_queue_size_;
829+
}
830+
831+
int App::GetDefaultRunQueueSize() const {
832+
return default_run_queue_size_;
833+
}
834+
819835
} // namespace myframe

myframe/app.h

+8-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this<App> {
5959
const std::string& lib_dir,
6060
int thread_pool_size = 4,
6161
int event_conn_size = 2,
62-
int warning_msg_size = 10);
62+
int warning_msg_size = 10,
63+
int default_pending_queue_size = -1,
64+
int default_run_queue_size = -1);
6365

6466
int LoadServiceFromDir(const std::string& path);
6567

@@ -91,6 +93,9 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this<App> {
9193

9294
void Quit();
9395

96+
int GetDefaultPendingQueueSize() const;
97+
int GetDefaultRunQueueSize() const;
98+
9499
private:
95100
bool CreateActorContext(
96101
const std::string& mod_name,
@@ -139,6 +144,8 @@ class MYFRAME_EXPORT App final : public std::enable_shared_from_this<App> {
139144
/// node地址
140145
std::string node_addr_;
141146
///
147+
int default_pending_queue_size_{-1};
148+
int default_run_queue_size_{-1};
142149
std::atomic<std::size_t> warning_msg_size_{10};
143150
std::atomic<State> state_{kUninitialized};
144151
std::recursive_mutex local_mtx_;

0 commit comments

Comments
 (0)