Skip to content

Commit cf5451a

Browse files
authored
Merge branch 'unstable' into feat-implement-TDIGEST.TRIMMED_MEAN-command
2 parents 8b2bb67 + 792df36 commit cf5451a

41 files changed

Lines changed: 1129 additions & 125 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.asf.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ github:
6767
'2.12': {}
6868
'2.13': {}
6969
'2.14': {}
70+
'2.15': {}
7071

7172
notifications:
7273
commits: commits@kvrocks.apache.org

.github/workflows/kvrocks.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ jobs:
5858
steps:
5959
- uses: actions/checkout@v4
6060
- name: Check typos
61-
uses: crate-ci/typos@v1.42.0
61+
uses: crate-ci/typos@v1.43.1
6262
with:
6363
config: .github/config/typos.toml
6464
- uses: apache/skywalking-eyes/header@v0.7.0

AGENTS.md

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
# AGENTS.md
2+
3+
This file provides guidance to AI coding agents (e.g., Claude Code, Cursor, ChatGPT Codex, Gemini) when working with code in this repository.
4+
5+
While working on Apache Kvrocks, please remember:
6+
7+
- Always use English in code and comments.
8+
- Only add meaningful comments when the code's behavior is difficult to understand.
9+
- Add or update tests to cover externally observable behavior and regressions when you change or add functionality.
10+
- Always run the formatter before submitting changes.
11+
12+
## Build and Development Commands
13+
14+
### Building
15+
16+
```bash
17+
# Build kvrocks and utilities
18+
./x.py build # Build to ./build directory
19+
./x.py build -j N # Build with N parallel jobs
20+
./x.py build --unittest # Build with unit tests
21+
./x.py build -DENABLE_OPENSSL=ON # Build with TLS support
22+
./x.py build --ninja # Use Ninja build system
23+
./x.py build --skip-build # Only run CMake configure
24+
./x.py build -DCMAKE_BUILD_TYPE=Debug # Debug build
25+
26+
# Run a local server
27+
./build/kvrocks -c kvrocks.conf
28+
29+
# Fetch dependencies
30+
./x.py fetch-deps # Fetch dependency archives
31+
```
32+
33+
### Testing
34+
35+
```bash
36+
# Build and run C++ unit tests
37+
./x.py build --unittest
38+
./x.py test cpp
39+
40+
# Run Go integration tests
41+
./x.py test go
42+
43+
# Run specific Go test by path
44+
./x.py test go tests/gocase/unit/...
45+
```
46+
47+
### Lint
48+
49+
You must run the formatter and linters before submitting code changes. This ensures code quality and consistency across the project. It requires installing `clang-format`, `clang-tidy`, and `golangci-lint` locally. Please refer to the CONTRIBUTING.md for setup instructions.
50+
51+
```bash
52+
# Format code (must pass before submitting)
53+
./x.py format
54+
55+
# Check code format (fails if not formatted)
56+
./x.py check format
57+
58+
# Run clang-tidy
59+
./x.py check tidy
60+
61+
# Run golangci-lint for Go tests
62+
./x.py check golangci-lint
63+
```
64+
65+
## Architecture Overview
66+
67+
Apache Kvrocks is a distributed key-value NoSQL database compatible with the Redis protocol, using RocksDB as its storage engine.
68+
69+
### Core Structure
70+
71+
- **`src/server/`**: Main server orchestration, connection handling, and worker threads. The `Server` class manages the event loop, worker threads, and coordinates all components.
72+
- **`src/storage/`**: RocksDB integration layer. Key classes:
73+
- `Storage`: Manages RocksDB instance, column families, and write batches
74+
- `Context`: Passes snapshot and batch between APIs for transactional consistency
75+
- **`src/commands/`**: Redis protocol command implementations. Each command type has a corresponding `Commander` subclass.
76+
- **`src/types/`**: Redis data structure implementations (String, Hash, List, Set, ZSet, Stream, etc.)
77+
- **`src/cluster/`**: Cluster management, slot migration, and replication.
78+
- **`src/search/`**: Full-text search and vector search (HNSW) implementation.
79+
- **`src/config/`**: Server configuration parsing and management.
80+
- **`src/cli/`**: Command-line interface utilities.
81+
- **`src/common/`**: Shared utilities and helper functions.
82+
- **`src/stats/`**: Statistics and metrics collection.
83+
84+
### Key Patterns
85+
86+
- **Column Families**: 8 column families are used - PrimarySubkey, Metadata, SecondarySubkey, PubSub, Propagate, Stream, Search, Index.
87+
- **Command Registration**: Commands are registered via the `REDIS_REGISTER_COMMANDS` macro with flags like `kCmdWrite`, `kCmdReadOnly`, `kCmdBlocking`, etc.
88+
- **Write Batch with Index**: Used for transactional mode to group writes before commit.
89+
- **Worker Thread Model**: Libevent-based async I/O with dedicated worker threads.
90+
- **Namespace Isolation**: Token-based multi-tenancy using the `__namespace` column family.
91+
92+
### Data Encoding
93+
94+
- `METADATA_ENCODING_VERSION=1` (default): Encodes 64-bit size and expire time in milliseconds.
95+
- `METADATA_ENCODING_VERSION=0`: Legacy encoding.
96+
97+
Refer to https://kvrocks.apache.org/community/data-structure-on-rocksdb for more details.
98+
99+
## Coding Style and Naming Conventions
100+
101+
- C++ formatting follows `.clang-format` (Google-based, 2-space indent, 120-column limit, sorted includes).
102+
- Use `.cc`/`.h` file extensions with `snake_case` filenames.
103+
- Types use `PascalCase`; match existing patterns in nearby code.
104+
- Use existing utilities and helper functions when possible; avoid reinventing the wheel.
105+
- Go code should stay `gofmt`-clean and comply with `tests/gocase/.golangci.yml`.
106+
107+
## Testing Guidelines
108+
109+
You could provide Go tests for integration-level verification of command behaviors and C++ unit tests for internal logic. Focus on testing new features or bug fixes, and avoid adding tests that don't verify meaningful behavior changes.
110+
111+
- **Go Integration Tests** (`tests/gocase/`): Use `*_test.go` files, organized by feature (unit/, integration/, tls/).
112+
- **C++ Unit Tests** (`tests/cppunit/`): Use `*_test.cc` files with GoogleTest framework.
113+
- Add or update tests alongside behavior changes.
114+
- Prefer focused unit tests; add integration coverage when commands or replication/storage behaviors change.
115+
- Use `./x.py test ...` entry points for consistent setup.
116+
117+
## Commit Message Format
118+
119+
Use conventional commits with a scope indicating the affected component:
120+
121+
```
122+
feat(rdb): add DUMP support for SortedInt type
123+
fix(replication): prevent WAL exhaustion from slow consumers
124+
fix(string): add empty string value check for INCR to match Redis behavior
125+
perf(hash): use MultiGet to reduce RocksDB calls in HMSET
126+
chore(deps): Bump rocksdb to v10.10.1
127+
chore(ci): bump crate-ci/typos action to v1.43.1
128+
chore(tests): replace to slices.Reverse() in go test
129+
```
130+
131+
Common scopes: `server`, `storage`, `commands`, `cluster`, `search`, `types`, `replication`, `rdb`, `stream`, `hash`, `string`, `list`, `set`, `zset`, `deps`, `ci`, `tests`, `conf`.
132+
133+
## Common Tasks
134+
135+
### Adding a New Command
136+
137+
1. Create or update the command handler in `src/commands/`.
138+
2. Implement the `Commander` subclass with `Parse()` and `Execute()` methods.
139+
3. Register the command using `REDIS_REGISTER_COMMANDS` macro with appropriate flags.
140+
4. Add the underlying data operation in `src/types/` if needed.
141+
5. Add C++ unit tests in `tests/cppunit/`.
142+
6. Add Go integration tests in `tests/gocase/`.
143+
144+
### Adding a New Data Type
145+
146+
1. Implement the type in `src/types/` following existing patterns.
147+
2. Define the metadata encoding in `src/storage/`.
148+
3. Add command handlers in `src/commands/`.
149+
4. Register commands with the `REDIS_REGISTER_COMMANDS` macro.
150+
5. Add tests for both the type operations and command handlers.
151+
152+
### Debugging
153+
154+
1. Check server logs (configurable log level in kvrocks config).
155+
2. Use the `DEBUG` command for runtime inspection.
156+
3. Use sanitizers via build flags for memory and thread issues.
157+
4. Refer to `tests/lsan-suppressions` and `tests/tsan-suppressions` for known suppression rules.
158+
159+
## Important Notes
160+
161+
- Kvrocks aims for Redis protocol compatibility; always verify behavior against Redis when implementing or fixing commands.
162+
- All changes must pass `./x.py check format` and `./x.py check tidy`.
163+
- Don't change public command behavior unless requested.
164+
- RocksDB is the core storage dependency; be cautious with storage-layer changes.
165+
- Adding a new column family breaks forward compatibility; avoid this if possible and prefer using existing column families.

NOTICE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Apache Kvrocks
2-
Copyright 2022-2025 The Apache Software Foundation
2+
Copyright 2022-2026 The Apache Software Foundation
33

44
This product includes software developed at
55
The Apache Software Foundation (http://www.apache.org/).

cmake/rocksdb.cmake

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ endif()
2626
include(cmake/utils.cmake)
2727

2828
FetchContent_DeclareGitHubWithMirror(rocksdb
29-
facebook/rocksdb v10.9.1
30-
MD5=06a521bf5749f73d0da29844f9ae6fca
29+
facebook/rocksdb v10.10.1
30+
MD5=dcef50080a4a6c0c0b4b77fd04c60502
3131
)
3232

3333
FetchContent_GetProperties(jemalloc)

cmake/zlib.cmake

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ include_guard()
2020
include(cmake/utils.cmake)
2121

2222
FetchContent_DeclareGitHubWithMirror(zlib
23-
zlib-ng/zlib-ng 2.3.2
24-
MD5=7818ea3f3ad80873674faf500fd12a0d
23+
zlib-ng/zlib-ng 2.3.3
24+
MD5=a2c8df556b61266f100d331268123115
2525
)
2626

2727
FetchContent_MakeAvailableWithArgs(zlib

kvrocks.conf

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,21 @@ replication-delay-bytes 16384
231231
# Default: 16 updates
232232
replication-delay-updates 16
233233

234+
# Maximum sequence lag allowed before disconnecting a slow replica.
235+
# If a replica falls behind by more than this many sequences, the master will
236+
# disconnect it to prevent WAL exhaustion. The replica can then reconnect and
237+
# attempt partial sync (psync) if the sequence is still available.
238+
# Set to 0 to disable this check (default).
239+
# Default: 0 (disabled)
240+
max-replication-lag 0
241+
242+
# Timeout in milliseconds for socket send operations to replicas.
243+
# If sending data to a replica blocks for longer than this timeout,
244+
# the connection will be dropped. This prevents the replication feed thread
245+
# from blocking indefinitely on slow consumers.
246+
# Default: 30000 (30 seconds)
247+
replication-send-timeout-ms 30000
248+
234249
# TCP listen() backlog.
235250
#
236251
# In high requests-per-second environments you need an high backlog in order

src/cluster/cluster.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ Cluster::Cluster(Server *srv, std::vector<std::string> binds, int port)
5555
// cluster data, so these commands should be executed exclusively, and ReadWriteLock
5656
// also can guarantee accessing data is safe.
5757
bool Cluster::SubCommandIsExecExclusive(const std::string &subcommand) {
58-
std::array subcommands = {"setnodes", "setnodeid", "setslot", "import", "reset"};
58+
std::array subcommands = {"setnodes", "setnodeid", "setslot", "import", "reset", "flushslots"};
5959

6060
return std::any_of(std::begin(subcommands), std::end(subcommands),
6161
[&subcommand](const std::string &val) { return util::EqualICase(val, subcommand); });

src/cluster/replication.cc

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::
6363
next_repl_seq_(next_repl_seq),
6464
req_(srv),
6565
max_delay_bytes_(srv->GetConfig()->max_replication_delay_bytes),
66-
max_delay_updates_(srv->GetConfig()->max_replication_delay_updates) {}
66+
max_delay_updates_(srv->GetConfig()->max_replication_delay_updates),
67+
max_replication_lag_(srv->GetConfig()->max_replication_lag),
68+
send_timeout_ms_(srv->GetConfig()->replication_send_timeout_ms) {}
6769

6870
Status FeedSlaveThread::Start() {
6971
auto s = util::CreateThread("feed-replica", [this] {
@@ -184,6 +186,21 @@ void FeedSlaveThread::loop() {
184186
while (!IsStopped()) {
185187
auto curr_seq = next_repl_seq_.load();
186188

189+
// Check replication lag - disconnect slow consumers before WAL is exhausted
190+
// Skip check if max_replication_lag_ is 0 (feature disabled)
191+
if (max_replication_lag_ > 0) {
192+
auto latest_seq = srv_->storage->LatestSeqNumber();
193+
if (latest_seq > curr_seq) {
194+
auto lag = static_cast<int64_t>(latest_seq - curr_seq);
195+
if (lag > max_replication_lag_) {
196+
ERROR("Replication lag {} exceeds max allowed {} for slave {}:{}, disconnecting to prevent WAL exhaustion",
197+
lag, max_replication_lag_, conn_->GetAnnounceIP(), conn_->GetListeningPort());
198+
Stop();
199+
return;
200+
}
201+
}
202+
}
203+
187204
if (!iter_ || !iter_->Valid()) {
188205
if (iter_) INFO("WAL was rotated, would reopen again");
189206
if (!srv_->storage->WALHasNewData(curr_seq) || !srv_->storage->GetWALIter(curr_seq, &iter_).IsOK()) {
@@ -221,10 +238,12 @@ void FeedSlaveThread::loop() {
221238
batches_bulk += redis::BulkString("_getack");
222239
}
223240

224-
// Send entire bulk which contain multiple batches
225-
auto s = util::SockSend(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent());
241+
// Send entire bulk which contain multiple batches with timeout
242+
// This prevents blocking indefinitely on slow consumers
243+
auto s = util::SockSendWithTimeout(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent(), send_timeout_ms_);
226244
if (!s.IsOK()) {
227-
ERROR("Write error while sending batch to slave: {}. batches: 0x{}", s.Msg(), util::StringToHex(batches_bulk));
245+
ERROR("Write error while sending batch to slave {}:{}: {}. batch_size={}", conn_->GetAnnounceIP(),
246+
conn_->GetListeningPort(), s.Msg(), batches_bulk.size());
228247
Stop();
229248
return;
230249
}
@@ -260,9 +279,14 @@ void ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev, int
260279
}
261280
if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) {
262281
ERROR("[replication] connection error/eof, reconnect the master");
263-
// Wait a bit and reconnect
282+
// Wait with exponential backoff before reconnecting
283+
constexpr int kMaxBackoffSeconds = 60;
284+
constexpr int kMaxShiftBits = 6; // Cap shift to avoid UB; 2^6 = 64 then clamped to 60
264285
repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
265-
std::this_thread::sleep_for(std::chrono::seconds(1));
286+
int attempts = repl_->reconnect_attempts_.fetch_add(1, std::memory_order_relaxed);
287+
int backoff_secs = std::min(1 << std::min(attempts, kMaxShiftBits), kMaxBackoffSeconds);
288+
WARN("[replication] waiting {} seconds before reconnecting (attempt {})", backoff_secs, attempts + 1);
289+
std::this_thread::sleep_for(std::chrono::seconds(backoff_secs));
266290
Stop();
267291
Start();
268292
}
@@ -634,6 +658,7 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
634658
} else {
635659
// PSYNC is OK, use IncrementBatchLoop
636660
INFO("[replication] PSync is ok, start increment batch loop");
661+
reconnect_attempts_.store(0, std::memory_order_relaxed); // Reset backoff counter on successful connection
637662
return CBState::NEXT;
638663
}
639664
}
@@ -879,6 +904,7 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
879904
return CBState::RESTART;
880905
}
881906
INFO("[replication] Succeeded restoring the backup, fullsync was finish");
907+
reconnect_attempts_.store(0, std::memory_order_relaxed); // Reset backoff counter on successful fullsync
882908
post_fullsync_cb_();
883909

884910
// It needs to reload namespaces from DB after the full sync is done,

src/cluster/replication.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ class FeedSlaveThread {
9191
// Configurable delay limits
9292
size_t max_delay_bytes_;
9393
size_t max_delay_updates_;
94+
int64_t max_replication_lag_;
95+
int send_timeout_ms_;
9496

9597
void loop();
9698
void checkLivenessIfNeed();
@@ -166,6 +168,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
166168
const bool replication_group_sync_ = false;
167169
std::atomic<int64_t> last_io_time_secs_ = 0;
168170
int64_t last_ack_time_secs_ = 0;
171+
std::atomic<int> reconnect_attempts_ = 0; // For exponential backoff on reconnection
169172
bool next_try_old_psync_ = false;
170173
bool next_try_without_announce_ip_address_ = false;
171174

0 commit comments

Comments
 (0)