Skip to content

Commit cffbd24

Browse files
authored
keep session info in RedisConnContext (#2902)
* keep session info in RedisConnContext * fix comment * add uint test for redis ctx * fix uinttest
1 parent 3e97efe commit cffbd24

File tree

7 files changed

+305
-55
lines changed

7 files changed

+305
-55
lines changed

example/BUILD.bazel

+11
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,14 @@ cc_binary(
123123
"//:brpc",
124124
],
125125
)
126+
127+
cc_binary(
128+
name = "redis_c++_server",
129+
srcs = [
130+
"redis_c++/redis_server.cpp",
131+
],
132+
copts = COPTS,
133+
deps = [
134+
"//:brpc",
135+
],
136+
)

example/redis_c++/redis_server.cpp

+102-12
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,54 @@
3131

3232
DEFINE_int32(port, 6379, "TCP Port of this server");
3333

34+
class AuthSession : public brpc::Destroyable {
35+
public:
36+
explicit AuthSession(const std::string& user_name, const std::string& password)
37+
: _user_name(user_name), _password(password) {}
38+
39+
void Destroy() override {
40+
delete this;
41+
}
42+
43+
const std::string _user_name;
44+
const std::string _password;
45+
};
46+
3447
class RedisServiceImpl : public brpc::RedisService {
3548
public:
36-
bool Set(const std::string& key, const std::string& value) {
49+
RedisServiceImpl() {
50+
_user_password["db1"] = "123456";
51+
_user_password["db2"] = "123456";
52+
_db_map["db1"].resize(kHashSlotNum);
53+
_db_map["db2"].resize(kHashSlotNum);
54+
}
55+
56+
bool Set(const std::string& db_name, const std::string& key, const std::string& value) {
3757
int slot = butil::crc32c::Value(key.c_str(), key.size()) % kHashSlotNum;
3858
_mutex[slot].lock();
39-
_map[slot][key] = value;
59+
auto& kv = _db_map[db_name];
60+
kv[slot][key] = value;
4061
_mutex[slot].unlock();
4162
return true;
4263
}
4364

44-
bool Get(const std::string& key, std::string* value) {
65+
bool Auth(const std::string& db_name, const std::string& password) {
66+
if (_user_password.find(db_name) == _user_password.end()) {
67+
return false;
68+
} else {
69+
if (_user_password[db_name] != password) {
70+
return false;
71+
}
72+
}
73+
return true;
74+
}
75+
76+
bool Get(const std::string& db_name, const std::string& key, std::string* value) {
4577
int slot = butil::crc32c::Value(key.c_str(), key.size()) % kHashSlotNum;
4678
_mutex[slot].lock();
47-
auto it = _map[slot].find(key);
48-
if (it == _map[slot].end()) {
79+
auto& kv = _db_map[db_name];
80+
auto it = kv[slot].find(key);
81+
if (it == kv[slot].end()) {
4982
_mutex[slot].unlock();
5083
return false;
5184
}
@@ -56,7 +89,9 @@ class RedisServiceImpl : public brpc::RedisService {
5689

5790
private:
5891
const static int kHashSlotNum = 32;
59-
std::unordered_map<std::string, std::string> _map[kHashSlotNum];
92+
typedef std::unordered_map<std::string, std::string> KVStore;
93+
std::unordered_map<std::string, std::vector<KVStore>> _db_map;
94+
std::unordered_map<std::string, std::string> _user_password;
6095
butil::Mutex _mutex[kHashSlotNum];
6196
};
6297

@@ -65,16 +100,27 @@ class GetCommandHandler : public brpc::RedisCommandHandler {
65100
explicit GetCommandHandler(RedisServiceImpl* rsimpl)
66101
: _rsimpl(rsimpl) {}
67102

68-
brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
103+
brpc::RedisCommandHandlerResult Run(brpc::RedisConnContext* ctx,
104+
const std::vector<butil::StringPiece>& args,
69105
brpc::RedisReply* output,
70106
bool /*flush_batched*/) override {
107+
108+
AuthSession* session = static_cast<AuthSession*>(ctx->get_session());
109+
if (session == nullptr) {
110+
output->FormatError("No auth session");
111+
return brpc::REDIS_CMD_HANDLED;
112+
}
113+
if (session->_user_name.empty()) {
114+
output->FormatError("No user name");
115+
return brpc::REDIS_CMD_HANDLED;
116+
}
71117
if (args.size() != 2ul) {
72118
output->FormatError("Expect 1 arg for 'get', actually %lu", args.size()-1);
73119
return brpc::REDIS_CMD_HANDLED;
74120
}
75121
const std::string key(args[1].data(), args[1].size());
76122
std::string value;
77-
if (_rsimpl->Get(key, &value)) {
123+
if (_rsimpl->Get(session->_user_name, key, &value)) {
78124
output->SetString(value);
79125
} else {
80126
output->SetNullString();
@@ -91,32 +137,76 @@ class SetCommandHandler : public brpc::RedisCommandHandler {
91137
explicit SetCommandHandler(RedisServiceImpl* rsimpl)
92138
: _rsimpl(rsimpl) {}
93139

94-
brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
140+
brpc::RedisCommandHandlerResult Run(brpc::RedisConnContext* ctx,
141+
const std::vector<butil::StringPiece>& args,
95142
brpc::RedisReply* output,
96143
bool /*flush_batched*/) override {
144+
AuthSession* session = static_cast<AuthSession*>(ctx->get_session());
145+
if (session == nullptr) {
146+
output->FormatError("No auth session");
147+
return brpc::REDIS_CMD_HANDLED;
148+
}
149+
if (session->_user_name.empty()) {
150+
output->FormatError("No user name");
151+
return brpc::REDIS_CMD_HANDLED;
152+
}
97153
if (args.size() != 3ul) {
98154
output->FormatError("Expect 2 args for 'set', actually %lu", args.size()-1);
99155
return brpc::REDIS_CMD_HANDLED;
100156
}
101157
const std::string key(args[1].data(), args[1].size());
102158
const std::string value(args[2].data(), args[2].size());
103-
_rsimpl->Set(key, value);
159+
_rsimpl->Set(session->_user_name, key, value);
104160
output->SetStatus("OK");
105161
return brpc::REDIS_CMD_HANDLED;
106162
}
107163

108164
private:
109-
RedisServiceImpl* _rsimpl;
165+
RedisServiceImpl* _rsimpl;
166+
};
167+
168+
169+
170+
class AuthCommandHandler : public brpc::RedisCommandHandler {
171+
public:
172+
explicit AuthCommandHandler(RedisServiceImpl* rsimpl)
173+
: _rsimpl(rsimpl) {}
174+
brpc::RedisCommandHandlerResult Run(brpc::RedisConnContext* ctx,
175+
const std::vector<butil::StringPiece>& args,
176+
brpc::RedisReply* output,
177+
bool /*flush_batched*/) override {
178+
if (args.size() != 3ul) {
179+
output->FormatError("Expect 2 args for 'auth', actually %lu", args.size()-1);
180+
return brpc::REDIS_CMD_HANDLED;
181+
}
182+
183+
const std::string db_name(args[1].data(), args[1].size());
184+
const std::string password(args[2].data(), args[2].size());
185+
186+
if (_rsimpl->Auth(db_name, password)) {
187+
output->SetStatus("OK");
188+
auto auth_session = new AuthSession(db_name, password);
189+
ctx->reset_session(auth_session);
190+
} else {
191+
output->FormatError("Invalid password for database '%s'", db_name.c_str());
192+
}
193+
return brpc::REDIS_CMD_HANDLED;
194+
}
195+
196+
private:
197+
RedisServiceImpl* _rsimpl;
110198
};
111199

112200
int main(int argc, char* argv[]) {
113201
google::ParseCommandLineFlags(&argc, &argv, true);
114202
RedisServiceImpl *rsimpl = new RedisServiceImpl;
115203
auto get_handler =std::unique_ptr<GetCommandHandler>(new GetCommandHandler(rsimpl));
116204
auto set_handler =std::unique_ptr<SetCommandHandler>( new SetCommandHandler(rsimpl));
205+
auto auth_handler = std::unique_ptr<AuthCommandHandler>(new AuthCommandHandler(rsimpl));
117206
rsimpl->AddCommandHandler("get", get_handler.get());
118207
rsimpl->AddCommandHandler("set", set_handler.get());
119-
208+
rsimpl->AddCommandHandler("auth", auth_handler.get());
209+
120210
brpc::Server server;
121211
brpc::ServerOptions server_options;
122212
server_options.redis_service = rsimpl;

src/brpc/policy/redis_protocol.cpp

+2-32
Original file line numberDiff line numberDiff line change
@@ -54,27 +54,6 @@ struct InputResponse : public InputMessageBase {
5454
}
5555
};
5656

57-
// This class is as parsing_context in socket.
58-
class RedisConnContext : public Destroyable {
59-
public:
60-
explicit RedisConnContext(const RedisService* rs)
61-
: redis_service(rs)
62-
, batched_size(0) {}
63-
64-
~RedisConnContext();
65-
// @Destroyable
66-
void Destroy() override;
67-
68-
const RedisService* redis_service;
69-
// If user starts a transaction, transaction_handler indicates the
70-
// handler pointer that runs the transaction command.
71-
std::unique_ptr<RedisCommandHandler> transaction_handler;
72-
// >0 if command handler is run in batched mode.
73-
int batched_size;
74-
75-
RedisCommandParser parser;
76-
butil::Arena arena;
77-
};
7857

7958
int ConsumeCommand(RedisConnContext* ctx,
8059
const std::vector<butil::StringPiece>& args,
@@ -83,7 +62,7 @@ int ConsumeCommand(RedisConnContext* ctx,
8362
RedisReply output(&ctx->arena);
8463
RedisCommandHandlerResult result = REDIS_CMD_HANDLED;
8564
if (ctx->transaction_handler) {
86-
result = ctx->transaction_handler->Run(args, &output, flush_batched);
65+
result = ctx->transaction_handler->Run(ctx, args, &output, flush_batched);
8766
if (result == REDIS_CMD_HANDLED) {
8867
ctx->transaction_handler.reset(NULL);
8968
} else if (result == REDIS_CMD_BATCHED) {
@@ -97,7 +76,7 @@ int ConsumeCommand(RedisConnContext* ctx,
9776
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", args[0].as_string().c_str());
9877
output.SetError(buf);
9978
} else {
100-
result = ch->Run(args, &output, flush_batched);
79+
result = ch->Run(ctx, args, &output, flush_batched);
10180
if (result == REDIS_CMD_CONTINUE) {
10281
if (ctx->batched_size != 0) {
10382
LOG(ERROR) << "CONTINUE should not be returned in a batched process.";
@@ -134,15 +113,6 @@ int ConsumeCommand(RedisConnContext* ctx,
134113
return 0;
135114
}
136115

137-
// ========== impl of RedisConnContext ==========
138-
139-
RedisConnContext::~RedisConnContext() { }
140-
141-
void RedisConnContext::Destroy() {
142-
delete this;
143-
}
144-
145-
// ========== impl of RedisConnContext ==========
146116

147117
ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
148118
bool read_eof, const void* arg) {

src/brpc/redis.cpp

+17
Original file line numberDiff line numberDiff line change
@@ -370,4 +370,21 @@ RedisCommandHandler* RedisCommandHandler::NewTransactionHandler() {
370370
return NULL;
371371
}
372372

373+
// ========== impl of RedisConnContext ==========
374+
RedisConnContext::~RedisConnContext() { }
375+
376+
void RedisConnContext::Destroy() {
377+
if (session) {
378+
session->Destroy();
379+
}
380+
delete this;
381+
}
382+
383+
void RedisConnContext::reset_session(Destroyable* s){
384+
if (session) {
385+
session->Destroy();
386+
}
387+
session = s;
388+
}
389+
373390
} // namespace brpc

src/brpc/redis.h

+44-2
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
#include <unordered_map>
2323

24+
#include "brpc/destroyable.h"
2425
#include "brpc/nonreflectable_message.h"
2526
#include "brpc/parse_result.h"
27+
#include "brpc/redis_command.h"
2628
#include "brpc/pb_compat.h"
2729
#include "brpc/redis_reply.h"
2830
#include "butil/arena.h"
@@ -210,6 +212,39 @@ enum RedisCommandHandlerResult {
210212
REDIS_CMD_BATCHED = 2,
211213
};
212214

215+
class RedisCommandParser;
216+
217+
// This class is as parsing_context in socket.
218+
class RedisConnContext : public Destroyable {
219+
public:
220+
explicit RedisConnContext(const RedisService* rs)
221+
: redis_service(rs)
222+
, batched_size(0)
223+
, session(nullptr) {}
224+
225+
~RedisConnContext();
226+
// @Destroyable
227+
void Destroy() override;
228+
void reset_session(Destroyable* s);
229+
230+
Destroyable* get_session() { return session; }
231+
232+
const RedisService* redis_service;
233+
// If user starts a transaction, transaction_handler indicates the
234+
// handler pointer that runs the transaction command.
235+
std::unique_ptr<RedisCommandHandler> transaction_handler;
236+
// >0 if command handler is run in batched mode.
237+
int batched_size;
238+
239+
RedisCommandParser parser;
240+
butil::Arena arena;
241+
242+
private:
243+
// If user is authenticated, session is set.
244+
// Keep auth session info in RedisConnContext to distinguish diffrent users( or diffrent db).
245+
Destroyable* session;
246+
};
247+
213248
// The Command handler for a redis request. User should impletement Run().
214249
class RedisCommandHandler {
215250
public:
@@ -235,8 +270,15 @@ class RedisCommandHandler {
235270
// it returns REDIS_CMD_HANDLED. Read the comment below.
236271
virtual RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
237272
brpc::RedisReply* output,
238-
bool flush_batched) = 0;
239-
273+
bool flush_batched) {
274+
return REDIS_CMD_HANDLED;
275+
};
276+
virtual RedisCommandHandlerResult Run(RedisConnContext* ctx,
277+
const std::vector<butil::StringPiece>& args,
278+
brpc::RedisReply* output,
279+
bool flush_batched) {
280+
return Run(args, output, flush_batched);
281+
}
240282
// The Run() returns CONTINUE for "multi", which makes brpc call this method to
241283
// create a transaction_handler to process following commands until transaction_handler
242284
// returns OK. For example, for command "multi; set k1 v1; set k2 v2; set k3 v3;

test/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ cc_test(
192192
],
193193
)
194194

195+
195196
cc_test(
196197
name = "bvar_test",
197198
srcs = glob(

0 commit comments

Comments
 (0)