Skip to content

Commit 12184a0

Browse files
author
g2px1
committed
added health checker
1 parent 31cd88e commit 12184a0

7 files changed

Lines changed: 269 additions & 60 deletions

File tree

CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ file(GLOB_RECURSE UPQ_SOURCES CONFIGURE_DEPENDS
3838
)
3939

4040
add_library(upq ${UPQ_SOURCES} ${UPQ_HEADERS}
41-
include/upq/PgNotificationMultiplexer.h)
41+
include/upq/PgNotificationMultiplexer.h
42+
include/upq/PgHealthChecker.h
43+
src/upq/PgHealthChecker.cpp)
4244
add_library(usub::upq ALIAS upq)
4345

4446
target_include_directories(upq

docs/pool.md

Lines changed: 92 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -23,31 +23,94 @@ It guarantees **no blocking**, **no busy-waiting**, and **safe use from multiple
2323

2424
## Initialization
2525

26+
`PgPool` now optionally supports a background health checker for periodic connection validation.
27+
2628
```cpp
2729
usub::pg::PgPool::init_global(
2830
host, port, user, db, password,
29-
max_pool_size, queue_capacity
31+
max_pool_size, queue_capacity,
32+
usub::pg::PgHealthConfig{
33+
.enabled = true,
34+
.interval_ms = 3000
35+
}
3036
);
3137
```
3238

33-
Example:
39+
When `.enabled = true`, the pool automatically spawns an internal coroutine that periodically runs lightweight
40+
`SELECT 1;` probes on temporary connections to ensure the database is reachable.
41+
42+
If `.enabled = false` (default), the health checker is disabled.
43+
44+
You can access runtime statistics at any time via:
3445

3546
```cpp
36-
usub::pg::PgPool::init_global(
37-
"localhost",
38-
"5432",
39-
"postgres",
40-
"mydb",
41-
"password",
42-
32, // max_pool_size
43-
64 // queue_capacity
44-
);
47+
auto& hc = usub::pg::PgPool::instance().health_checker();
48+
auto& stats = hc.stats();
49+
50+
std::cout
51+
<< "checks=" << stats.iterations.load()
52+
<< " ok=" << stats.ok_checks.load()
53+
<< " failed=" << stats.failed_checks.load()
54+
<< std::endl;
55+
```
56+
57+
---
58+
59+
## Health Checker
60+
61+
`PgHealthChecker` is a lightweight background monitor built into the pool.
62+
It ensures early detection of connection failures without impacting performance.
63+
64+
| Field | Type | Description |
65+
|-----------------|--------------------|-------------------------------------------------|
66+
| `enabled` | `bool` | Whether the health checker coroutine should run |
67+
| `interval_ms` | `uint64_t` | Delay between health probes |
68+
| `iterations` | `atomic<uint64_t>` | Total number of health iterations |
69+
| `ok_checks` | `atomic<uint64_t>` | Successful `SELECT 1` responses |
70+
| `failed_checks` | `atomic<uint64_t>` | Failed or unreachable checks |
71+
72+
### Behavior
4573

46-
auto& pool = usub::pg::PgPool::instance();
74+
* When enabled, a coroutine runs forever in the background.
75+
* Every interval:
76+
77+
1. Acquires a fresh connection from the pool.
78+
2. Executes `SELECT 1;` using non-blocking I/O.
79+
3. Updates internal counters.
80+
4. Returns the connection back to the pool.
81+
* Failures are silent — they do not throw or disrupt normal queries.
82+
* Intended for observability and proactive reconnection handling.
83+
84+
### Example
85+
86+
```cpp
87+
task::Awaitable<void> print_pg_health()
88+
{
89+
auto& pool = usub::pg::PgPool::instance();
90+
auto& hc = pool.health_checker();
91+
92+
for (;;)
93+
{
94+
auto& s = hc.stats();
95+
std::cout
96+
<< "[Health] iter=" << s.iterations.load()
97+
<< " ok=" << s.ok_checks.load()
98+
<< " fail=" << s.failed_checks.load()
99+
<< std::endl;
100+
101+
co_await usub::uvent::system::this_coroutine::sleep_for(
102+
std::chrono::seconds(5)
103+
);
104+
}
105+
}
47106
```
48107

49-
This must be called **once** during startup.
50-
`instance()` returns the global singleton thereafter.
108+
### Notes
109+
110+
* Uses existing event loop and coroutine scheduler (`uvent`).
111+
* Generates negligible load on the database (`SELECT 1`).
112+
* Default interval: **600 000 ms**.
113+
* Can be disabled completely if unnecessary.
51114

52115
---
53116

@@ -201,10 +264,6 @@ This provides natural backpressure and keeps memory footprint predictable.
201264

202265
---
203266

204-
## Error transparency (since v1.0.1)
205-
206-
All queries and internal operations now report structured diagnostic codes.
207-
208267
### Example: Connection loss
209268

210269
```cpp
@@ -244,25 +303,26 @@ if (!res.ok && res.code == PgErrorCode::ServerError)
244303

245304
## Best practices
246305

247-
- ✅ Initialize pool early (before spawning coroutines).
248-
- ✅ Use `query_awaitable()` for short-lived queries.
249-
- ✅ Use manual `acquire_connection()` for long-running listeners or transactions.
250-
- ✅ Check `QueryResult.code` and `ok` for every query.
251-
- ✅ Release connections when finished — don’t hold them across awaits.
252-
- ❌ Don’t share one connection across threads.
253-
- ❌ Don’t assume immediate connection creation — first queries may take longer.
306+
* ✅ Initialize pool early (before spawning coroutines).
307+
* ✅ Use `query_awaitable()` for short-lived queries.
308+
* ✅ Use manual `acquire_connection()` for long-running listeners or transactions.
309+
* ✅ Check `QueryResult.code` and `ok` for every query.
310+
* ✅ Release connections when finished — don’t hold them across awaits.
311+
* ❌ Don’t share one connection across threads.
312+
* ❌ Don’t assume immediate connection creation — first queries may take longer.
254313

255314
---
256315

257316
## Summary
258317

259-
| Feature | Description |
260-
|--------------|-------------------------------------------|
261-
| Asynchronous | Non-blocking coroutine API |
262-
| Safe | Lock-free concurrent access |
263-
| Lazy | Creates connections only as needed |
264-
| Bounded | Max live connections controlled by config |
265-
| Transparent | Every error is classified and traceable |
318+
| Feature | Description |
319+
|----------------|-------------------------------------------|
320+
| Asynchronous | Non-blocking coroutine API |
321+
| Safe | Lock-free concurrent access |
322+
| Lazy | Creates connections only as needed |
323+
| Bounded | Max live connections controlled by config |
324+
| Transparent | Every error is classified and traceable |
325+
| Health Checker | Background connection heartbeat loop |
266326

267327
`PgPool` is the backbone of the upq runtime — it provides scalable, coroutine-friendly database access with
268328
deterministic failure semantics.

examples/main.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,11 @@ int main()
204204
"postgres", // db
205205
"password", // password
206206
/*max_pool_size*/ 32,
207-
/*queue_capacity*/ 64
207+
/*queue_capacity*/ 64,
208+
usub::pg::PgPoolHealthConfig{
209+
.enabled = true,
210+
.interval_ms = 3000
211+
}
208212
);
209213

210214
uvent.for_each_thread([&](int threadIndex, thread::ThreadLocalStorage* tls)

include/upq/PgHealthChecker.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#ifndef PGHEALTHCHECKER_H
2+
#define PGHEALTHCHECKER_H
3+
4+
#include <cstdint>
5+
#include <atomic>
6+
#include <memory>
7+
#include <chrono>
8+
9+
#include "uvent/Uvent.h"
10+
#include "upq/PgPool.h"
11+
#include "upq/PgConnection.h"
12+
#include "upq/PgTypes.h"
13+
14+
namespace usub::pg
15+
{
16+
struct PgHealthStats
17+
{
18+
std::atomic<uint64_t> iterations{0};
19+
std::atomic<uint64_t> ok_checks{0};
20+
std::atomic<uint64_t> failed_checks{0};
21+
};
22+
23+
class PgHealthChecker
24+
{
25+
public:
26+
PgHealthChecker(PgPool& pool, PgPoolHealthConfig cfg = {});
27+
28+
PgPoolHealthConfig& config();
29+
const PgPoolHealthConfig& config() const;
30+
31+
PgHealthStats& stats();
32+
const PgHealthStats& stats() const;
33+
34+
usub::uvent::task::Awaitable<void> run();
35+
36+
private:
37+
PgPool& pool_;
38+
PgPoolHealthConfig cfg_;
39+
PgHealthStats stats_;
40+
};
41+
} // namespace usub::pg
42+
43+
#endif // PGHEALTHCHECKER_H

include/upq/PgPool.h

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@
1616

1717
namespace usub::pg
1818
{
19+
struct PgPoolHealthConfig
20+
{
21+
bool enabled{false};
22+
uint64_t interval_ms{600000};
23+
};
24+
25+
class PgHealthChecker;
26+
1927
class PgPool
2028
{
2129
public:
@@ -25,15 +33,17 @@ namespace usub::pg
2533
std::string db,
2634
std::string password,
2735
size_t max_pool_size = 32,
28-
size_t queue_capacity = 64);
36+
size_t queue_capacity = 64,
37+
PgPoolHealthConfig health_cfg = {});
2938

3039
static void init_global(const std::string& host,
3140
const std::string& port,
3241
const std::string& user,
3342
const std::string& db,
3443
const std::string& password,
3544
size_t max_pool_size = 32,
36-
size_t queue_capacity = 64);
45+
size_t queue_capacity = 64,
46+
PgPoolHealthConfig health_cfg = {});
3747

3848
static PgPool& instance();
3949

@@ -52,30 +62,26 @@ namespace usub::pg
5262
usub::uvent::task::Awaitable<QueryResult>
5363
query_awaitable(const std::string& sql, Args&&... args);
5464

55-
inline std::string host()
56-
{
57-
return this->host_;
58-
}
65+
inline std::string host() { return this->host_; }
66+
inline std::string port() { return this->port_; }
67+
inline std::string user() { return this->user_; }
68+
inline std::string db() { return this->db_; }
69+
inline std::string password() { return this->password_; }
5970

60-
inline std::string port()
61-
{
62-
return this->port_;
63-
}
71+
inline PgPoolHealthConfig health_cfg() const { return this->health_cfg_; }
6472

65-
inline std::string user()
66-
{
67-
return this->user_;
68-
}
73+
PgHealthChecker& health_checker();
6974

70-
inline std::string db()
71-
{
72-
return this->db_;
73-
}
75+
void mark_dead(std::shared_ptr<PgConnectionLibpq> const& conn);
7476

75-
inline std::string password()
77+
struct HealthStats
7678
{
77-
return this->password_;
78-
}
79+
std::atomic<uint64_t> checked{0};
80+
std::atomic<uint64_t> alive{0};
81+
std::atomic<uint64_t> reconnected{0};
82+
};
83+
84+
inline HealthStats& health_stats() { return stats_; }
7985

8086
private:
8187
std::string host_;
@@ -89,6 +95,12 @@ namespace usub::pg
8995
size_t max_pool_;
9096
std::atomic<size_t> live_count_;
9197

98+
PgPoolHealthConfig health_cfg_;
99+
100+
HealthStats stats_;
101+
102+
std::unique_ptr<PgHealthChecker> health_checker_;
103+
92104
static std::unique_ptr<PgPool> instance_;
93105
};
94106

@@ -138,7 +150,6 @@ namespace usub::pg
138150
release_connection(conn);
139151
co_return qr;
140152
}
141-
142153
} // namespace usub::pg
143154

144155
#endif // PGPOOL_H

src/upq/PgHealthChecker.cpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#include "upq/PgHealthChecker.h"
2+
#include "uvent/system/SystemContext.h"
3+
4+
namespace usub::pg
5+
{
6+
PgHealthChecker::PgHealthChecker(PgPool& pool, PgPoolHealthConfig cfg)
7+
: pool_(pool), cfg_(std::move(cfg))
8+
{
9+
}
10+
11+
PgPoolHealthConfig& PgHealthChecker::config() { return this->cfg_; }
12+
const PgPoolHealthConfig& PgHealthChecker::config() const { return this->cfg_; }
13+
14+
PgHealthStats& PgHealthChecker::stats() { return this->stats_; }
15+
const PgHealthStats& PgHealthChecker::stats() const { return this->stats_; }
16+
17+
usub::uvent::task::Awaitable<void> PgHealthChecker::run()
18+
{
19+
for (;;)
20+
{
21+
PgPoolHealthConfig cur_cfg = this->cfg_;
22+
23+
if (!cur_cfg.enabled)
24+
{
25+
co_await usub::uvent::system::this_coroutine::sleep_for(
26+
std::chrono::milliseconds(1000)
27+
);
28+
continue;
29+
}
30+
31+
uint64_t interval = cur_cfg.interval_ms;
32+
if (interval == 0)
33+
interval = 1000;
34+
35+
this->stats_.iterations.fetch_add(1, std::memory_order_relaxed);
36+
37+
auto conn = co_await this->pool_.acquire_connection();
38+
39+
bool alive = false;
40+
if (conn && conn->connected())
41+
{
42+
QueryResult ping = co_await conn->exec_simple_query_nonblocking("SELECT 1;");
43+
if (ping.ok)
44+
{
45+
alive = true;
46+
this->stats_.ok_checks.fetch_add(1, std::memory_order_relaxed);
47+
}
48+
}
49+
50+
if (!alive)
51+
{
52+
this->stats_.failed_checks.fetch_add(1, std::memory_order_relaxed);
53+
}
54+
55+
this->pool_.release_connection(conn);
56+
57+
co_await usub::uvent::system::this_coroutine::sleep_for(
58+
std::chrono::milliseconds(interval)
59+
);
60+
}
61+
62+
co_return;
63+
}
64+
} // namespace usub::pg

0 commit comments

Comments
 (0)