Skip to content

Commit 665760a

Browse files
Added try/catch mechanism
1 parent 1740e42 commit 665760a

File tree

2 files changed

+82
-4
lines changed

2 files changed

+82
-4
lines changed

memtier_benchmark.cpp

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,9 +1379,12 @@ struct cg_thread {
13791379
abstract_protocol* m_protocol;
13801380
pthread_t m_thread;
13811381
bool m_finished;
1382+
bool m_restart_requested;
1383+
unsigned int m_restart_count;
13821384

13831385
cg_thread(unsigned int id, benchmark_config* config, object_generator* obj_gen) :
1384-
m_thread_id(id), m_config(config), m_obj_gen(obj_gen), m_cg(NULL), m_protocol(NULL), m_finished(false)
1386+
m_thread_id(id), m_config(config), m_obj_gen(obj_gen), m_cg(NULL), m_protocol(NULL),
1387+
m_finished(false), m_restart_requested(false), m_restart_count(0)
13851388
{
13861389
m_protocol = protocol_factory(m_config->protocol);
13871390
assert(m_protocol != NULL);
@@ -1420,13 +1423,57 @@ struct cg_thread {
14201423
assert(ret == 0);
14211424
}
14221425

1426+
int restart(void)
1427+
{
1428+
// Clean up existing client group
1429+
if (m_cg != NULL) {
1430+
delete m_cg;
1431+
}
1432+
1433+
// Create new client group
1434+
m_cg = new client_group(m_config, m_protocol, m_obj_gen);
1435+
1436+
// Prepare new clients
1437+
if (m_cg->create_clients(m_config->clients) < (int) m_config->clients)
1438+
return -1;
1439+
if (m_cg->prepare() < 0)
1440+
return -1;
1441+
1442+
// Reset state
1443+
m_finished = false;
1444+
m_restart_requested = false;
1445+
m_restart_count++;
1446+
1447+
// Start new thread
1448+
return pthread_create(&m_thread, NULL, cg_thread_start, (void *)this);
1449+
}
1450+
14231451
};
14241452

14251453
static void* cg_thread_start(void *t)
14261454
{
14271455
cg_thread* thread = (cg_thread*) t;
1428-
thread->m_cg->run();
1429-
thread->m_finished = true;
1456+
1457+
try {
1458+
thread->m_cg->run();
1459+
1460+
// Check if we should restart due to connection failures
1461+
// If the thread finished but still has time left and connection errors, request restart
1462+
if (!g_shutdown_requested && thread->m_cg->get_total_connection_errors() > 0) {
1463+
benchmark_error_log("Thread %u finished due to connection failures, requesting restart.\n", thread->m_thread_id);
1464+
thread->m_restart_requested = true;
1465+
}
1466+
1467+
thread->m_finished = true;
1468+
} catch (const std::exception& e) {
1469+
benchmark_error_log("Thread %u caught exception: %s\n", thread->m_thread_id, e.what());
1470+
thread->m_finished = true;
1471+
thread->m_restart_requested = true;
1472+
} catch (...) {
1473+
benchmark_error_log("Thread %u caught unknown exception\n", thread->m_thread_id);
1474+
thread->m_finished = true;
1475+
thread->m_restart_requested = true;
1476+
}
14301477

14311478
return t;
14321479
}
@@ -1517,6 +1564,22 @@ run_stats run_benchmark(int run_id, benchmark_config* cfg, object_generator* obj
15171564
unsigned long int total_connection_errors = 0;
15181565

15191566
for (std::vector<cg_thread*>::iterator i = threads.begin(); i != threads.end(); i++) {
1567+
// Check if thread needs restart
1568+
if ((*i)->m_finished && (*i)->m_restart_requested && (*i)->m_restart_count < 5) {
1569+
benchmark_error_log("Restarting thread %u (restart #%u)...\n",
1570+
(*i)->m_thread_id, (*i)->m_restart_count + 1);
1571+
1572+
// Join the failed thread first
1573+
(*i)->join();
1574+
1575+
// Attempt to restart
1576+
if ((*i)->restart() == 0) {
1577+
benchmark_error_log("Thread %u restarted successfully.\n", (*i)->m_thread_id);
1578+
} else {
1579+
benchmark_error_log("Failed to restart thread %u.\n", (*i)->m_thread_id);
1580+
}
1581+
}
1582+
15201583
if (!(*i)->m_finished)
15211584
active_threads++;
15221585

shard_connection.cpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,11 @@ void shard_connection::handle_event(short events)
692692
event_add(m_reconnect_timer, &delay);
693693
m_reconnecting = true;
694694
} else {
695+
benchmark_error_log("Maximum reconnection attempts (%u) exceeded for connection error, triggering thread restart.\n",
696+
m_config->max_reconnect_attempts);
695697
disconnect();
698+
// Break the event loop to trigger thread restart
699+
event_base_loopbreak(m_event_base);
696700
}
697701

698702
return;
@@ -727,7 +731,11 @@ void shard_connection::handle_event(short events)
727731
event_add(m_reconnect_timer, &delay);
728732
m_reconnecting = true;
729733
} else {
734+
benchmark_error_log("Maximum reconnection attempts (%u) exceeded for connection drop, triggering thread restart.\n",
735+
m_config->max_reconnect_attempts);
730736
disconnect();
737+
// Break the event loop to trigger thread restart
738+
event_base_loopbreak(m_event_base);
731739
}
732740

733741
return;
@@ -768,11 +776,14 @@ void shard_connection::handle_reconnect_timer_event() {
768776
event_add(m_reconnect_timer, &delay);
769777
m_reconnecting = true;
770778
} else {
771-
benchmark_error_log("Maximum reconnection attempts (%u) exceeded, giving up.\n",
779+
benchmark_error_log("Maximum reconnection attempts (%u) exceeded, triggering thread restart.\n",
772780
m_config->max_reconnect_attempts);
773781
// Reset for potential future reconnections
774782
m_reconnect_attempts = 0;
775783
m_current_backoff_delay = 1.0;
784+
785+
// Break the event loop to trigger thread restart
786+
event_base_loopbreak(m_event_base);
776787
}
777788
} else {
778789
benchmark_error_log("Reconnection successful after %u attempts.\n", m_reconnect_attempts);
@@ -817,7 +828,11 @@ void shard_connection::handle_connection_timeout_event() {
817828
event_add(m_reconnect_timer, &delay);
818829
m_reconnecting = true;
819830
} else {
831+
benchmark_error_log("Maximum reconnection attempts (%u) exceeded for connection timeout, triggering thread restart.\n",
832+
m_config->max_reconnect_attempts);
820833
disconnect();
834+
// Break the event loop to trigger thread restart
835+
event_base_loopbreak(m_event_base);
821836
}
822837
}
823838

0 commit comments

Comments
 (0)