|
7 | 7 | #include <storages/redis/impl/command.hpp> |
8 | 8 | #include <storages/redis/impl/secdist_redis.hpp> |
9 | 9 | #include <storages/redis/impl/sentinel.hpp> |
| 10 | +#include <storages/redis/impl/subscribe_sentinel.hpp> |
10 | 11 | #include <storages/redis/impl/thread_pools.hpp> |
| 12 | +#include <storages/redis/subscribe_client_impl.hpp> |
| 13 | +#include <userver/dynamic_config/test_helpers.hpp> |
| 14 | +#include <userver/storages/redis/subscribe_client.hpp> |
| 15 | +#include <userver/storages/redis/subscription_token.hpp> |
| 16 | + |
| 17 | +#include <userver/utest/utest.hpp> |
11 | 18 |
|
12 | 19 | USERVER_NAMESPACE_BEGIN |
13 | 20 |
|
@@ -44,6 +51,52 @@ bool IsConnected(const storages::redis::impl::Redis& redis) { |
44 | 51 | return redis.GetState() == storages::redis::RedisState::kConnected; |
45 | 52 | } |
46 | 53 |
|
| 54 | +struct MockSentinelServers { |
| 55 | + static constexpr size_t kRedisThreadCount = 1; |
| 56 | + static constexpr std::string_view kRedisName = "redis_name"; |
| 57 | + |
| 58 | + void RegisterSentinelMastersSlaves() { |
| 59 | + std::vector<MockRedisServer::SlaveInfo> slave_infos; |
| 60 | + std::string redis_name{kRedisName}; |
| 61 | + for (const auto& slave : slaves) { |
| 62 | + slave_infos.emplace_back(redis_name, kLocalhost, slave.GetPort()); |
| 63 | + } |
| 64 | + |
| 65 | + for (auto& sentinel : sentinels) { |
| 66 | + sentinel.RegisterSentinelMastersHandler({{redis_name, kLocalhost, masters[0].GetPort()}}); |
| 67 | + sentinel.RegisterSentinelSlavesHandler(redis_name, slave_infos); |
| 68 | + } |
| 69 | + } |
| 70 | + |
| 71 | + template <class Function> |
| 72 | + void ForEachServer(const Function& visitor) { |
| 73 | + for (auto& server : masters) { |
| 74 | + visitor(server); |
| 75 | + } |
| 76 | + for (auto& server : slaves) { |
| 77 | + visitor(server); |
| 78 | + } |
| 79 | + for (auto& server : sentinels) { |
| 80 | + visitor(server); |
| 81 | + } |
| 82 | + } |
| 83 | + |
| 84 | + MockRedisServer masters[1] = { |
| 85 | + MockRedisServer{"master0"}, |
| 86 | + }; |
| 87 | + MockRedisServer slaves[2] = { |
| 88 | + MockRedisServer{"slave0"}, |
| 89 | + MockRedisServer{"slave1"}, |
| 90 | + }; |
| 91 | + MockRedisServer sentinels[3] = { |
| 92 | + MockRedisServer{"sentinel0"}, |
| 93 | + MockRedisServer{"sentinel1"}, |
| 94 | + MockRedisServer{"sentinel2"}, |
| 95 | + }; |
| 96 | + std::shared_ptr<storages::redis::impl::ThreadPools> thread_pool = |
| 97 | + std::make_shared<storages::redis::impl::ThreadPools>(1, kRedisThreadCount); |
| 98 | +}; |
| 99 | + |
47 | 100 | } // namespace |
48 | 101 |
|
49 | 102 | TEST(Redis, NoPassword) { |
@@ -101,6 +154,124 @@ TEST(Redis, AuthTimeout) { |
101 | 154 | PeriodicCheck([&] { return !IsConnected(*redis); }); |
102 | 155 | } |
103 | 156 |
|
| 157 | +UTEST(Redis, SentinelAuth) { |
| 158 | + MockSentinelServers mock; |
| 159 | + mock.RegisterSentinelMastersSlaves(); |
| 160 | + mock.ForEachServer([](auto& server) { server.RegisterPingHandler(); }); |
| 161 | + auto& [masters, slaves, sentinels, thread_pool] = mock; |
| 162 | + |
| 163 | + secdist::RedisSettings settings; |
| 164 | + settings.shards = {std::string{MockSentinelServers::kRedisName}}; |
| 165 | + settings.sentinel_password = storages::redis::Password("pass"); |
| 166 | + settings.sentinels.reserve(std::size(sentinels)); |
| 167 | + for (const auto& sentinel : sentinels) { |
| 168 | + settings.sentinels.emplace_back(kLocalhost, sentinel.GetPort()); |
| 169 | + } |
| 170 | + |
| 171 | + std::vector<MockRedisServer::HandlerPtr> auth_handlers; |
| 172 | + auth_handlers.reserve(std::size(sentinels)); |
| 173 | + for (auto& sentinel : sentinels) { |
| 174 | + auth_handlers.push_back(sentinel.RegisterStatusReplyHandler("AUTH", "OK")); |
| 175 | + } |
| 176 | + std::vector<MockRedisServer::HandlerPtr> no_auth_handlers; |
| 177 | + no_auth_handlers.reserve(std::size(masters) + std::size(slaves)); |
| 178 | + for (auto& server : masters) { |
| 179 | + no_auth_handlers.push_back(server.RegisterStatusReplyHandler("AUTH", "FAIL")); |
| 180 | + } |
| 181 | + for (auto& server : slaves) { |
| 182 | + no_auth_handlers.push_back(server.RegisterStatusReplyHandler("AUTH", "FAIL")); |
| 183 | + } |
| 184 | + |
| 185 | + auto sentinel_client = storages::redis::impl::Sentinel::CreateSentinel( |
| 186 | + thread_pool, settings, "test_shard_group_name", dynamic_config::GetDefaultSource(), "test_client_name", {""} |
| 187 | + ); |
| 188 | + sentinel_client->WaitConnectedDebug(std::empty(slaves)); |
| 189 | + |
| 190 | + for (auto& handler : auth_handlers) { |
| 191 | + EXPECT_TRUE(handler->WaitForFirstReply(kSmallPeriod)); |
| 192 | + } |
| 193 | + |
| 194 | + for (auto& handler : no_auth_handlers) { |
| 195 | + EXPECT_FALSE(handler->WaitForFirstReply(kWaitPeriod)); |
| 196 | + } |
| 197 | + |
| 198 | + for (const auto& sentinel : sentinels) { |
| 199 | + EXPECT_TRUE(sentinel.WaitForFirstPingReply(kSmallPeriod)); |
| 200 | + } |
| 201 | +} |
| 202 | + |
| 203 | +// TODO: TAXICOMMON-10834. Looks like AUTH to sentinel is not sent in case of SUBSCRIBE |
| 204 | +UTEST(Redis, DISABLED_SentinelAuthSubscribe) { |
| 205 | + MockSentinelServers mock; |
| 206 | + mock.RegisterSentinelMastersSlaves(); |
| 207 | + mock.ForEachServer([](auto& server) { server.RegisterPingHandler(); }); |
| 208 | + auto& [masters, slaves, sentinels, thread_pool] = mock; |
| 209 | + // Sentinels do NOT receive SUBSCRIBE |
| 210 | + std::vector<MockRedisServer::HandlerPtr> subscribe_handlers; |
| 211 | + for (auto& server : masters) { |
| 212 | + subscribe_handlers.push_back(server.RegisterHandlerWithConstReply("SUBSCRIBE", 1)); |
| 213 | + } |
| 214 | + for (auto& server : slaves) { |
| 215 | + subscribe_handlers.push_back(server.RegisterHandlerWithConstReply("SUBSCRIBE", 1)); |
| 216 | + } |
| 217 | + |
| 218 | + secdist::RedisSettings settings; |
| 219 | + settings.shards = {std::string{MockSentinelServers::kRedisName}}; |
| 220 | + settings.sentinel_password = storages::redis::Password("pass"); |
| 221 | + settings.sentinels.reserve(std::size(sentinels)); |
| 222 | + for (const auto& sentinel : sentinels) { |
| 223 | + settings.sentinels.emplace_back(kLocalhost, sentinel.GetPort()); |
| 224 | + } |
| 225 | + |
| 226 | + std::vector<MockRedisServer::HandlerPtr> auth_handlers; |
| 227 | + auth_handlers.reserve(std::size(sentinels)); |
| 228 | + for (auto& sentinel : sentinels) { |
| 229 | + auth_handlers.push_back(sentinel.RegisterStatusReplyHandler("AUTH", "OK")); |
| 230 | + } |
| 231 | + std::vector<MockRedisServer::HandlerPtr> no_auth_handlers; |
| 232 | + no_auth_handlers.reserve(std::size(masters) + std::size(slaves)); |
| 233 | + for (auto& server : masters) { |
| 234 | + no_auth_handlers.push_back(server.RegisterStatusReplyHandler("AUTH", "FAIL")); |
| 235 | + } |
| 236 | + for (auto& server : slaves) { |
| 237 | + no_auth_handlers.push_back(server.RegisterStatusReplyHandler("AUTH", "FAIL")); |
| 238 | + } |
| 239 | + |
| 240 | + storages::redis::CommandControl cc{}; |
| 241 | + testsuite::RedisControl redis_control{}; |
| 242 | + auto subscribe_sentinel = storages::redis::impl::SubscribeSentinel::Create( |
| 243 | + thread_pool, |
| 244 | + settings, |
| 245 | + "test_shard_group_name", |
| 246 | + dynamic_config::GetDefaultSource(), |
| 247 | + "test_client_name", |
| 248 | + {""}, |
| 249 | + cc, |
| 250 | + redis_control |
| 251 | + ); |
| 252 | + subscribe_sentinel->WaitConnectedDebug(std::empty(slaves)); |
| 253 | + std::shared_ptr<storages::redis::SubscribeClient> client = |
| 254 | + std::make_shared<storages::redis::SubscribeClientImpl>(std::move(subscribe_sentinel)); |
| 255 | + |
| 256 | + storages::redis::SubscriptionToken::OnMessageCb callback = [](const std::string& channel, |
| 257 | + const std::string& message) { |
| 258 | + EXPECT_TRUE(false) << "Should not be called. Channel = " << channel << ", message = " << message; |
| 259 | + }; |
| 260 | + auto subscription = client->Subscribe("channel_name", std::move(callback)); |
| 261 | + |
| 262 | + for (auto& handler : subscribe_handlers) { |
| 263 | + EXPECT_TRUE(handler->WaitForFirstReply(utest::kMaxTestWaitTime)); |
| 264 | + } |
| 265 | + |
| 266 | + for (auto& handler : auth_handlers) { |
| 267 | + EXPECT_TRUE(handler->WaitForFirstReply(kSmallPeriod)); |
| 268 | + } |
| 269 | + |
| 270 | + for (auto& handler : no_auth_handlers) { |
| 271 | + EXPECT_FALSE(handler->WaitForFirstReply(kWaitPeriod)); |
| 272 | + } |
| 273 | +} |
| 274 | + |
104 | 275 | TEST(Redis, Select) { |
105 | 276 | MockRedisServer server{"redis_db"}; |
106 | 277 | auto ping_handler = server.RegisterPingHandler(); |
|
0 commit comments