@@ -83,7 +83,11 @@ redisReply* RedisClient::executeCommand(const char* format, ...) const
8383{
8484 va_list ap;
8585 va_start (ap, format);
86- auto result = d_executor->executeCommand (format, ap);
86+ auto connection = d_connection.getConnection ();
87+ auto result = static_cast <redisReply*>(redisvCommand (connection->get (), format, ap));
88+ if (connection->get ()->err != 0 ) {
89+ vinfolog (" Redis connection error %s" , connection->get ()->errstr );
90+ }
8791 va_end (ap);
8892 return result;
8993};
@@ -96,143 +100,14 @@ redisReply* RedisClient::executeCommandArgv(std::vector<std::string> args) const
96100 argv.push_back (args[i].data ());
97101 argvlen.push_back (args[i].length ());
98102 }
99- return d_executor->executeCommandArgv (args.size (), argv.data (), argvlen.data ());
100- };
101-
102- redisReply* RedisClient::DirectExecutor::executeCommand (const char * format, va_list ap) const
103- {
104103 auto connection = d_connection.getConnection ();
105- auto result = static_cast <redisReply*>(redisvCommand (connection->get (), format, ap ));
104+ auto result = static_cast <redisReply*>(redisCommandArgv (connection->get (), args. size (), argv. data (), argvlen. data () ));
106105 if (connection->get ()->err != 0 ) {
107106 vinfolog (" Redis connection error %s" , connection->get ()->errstr );
108107 }
109108 return result;
110- }
111-
112- redisReply* RedisClient::DirectExecutor::executeCommandArgv (int argc, const char ** argv, const size_t * argvlen) const
113- {
114- auto connection = d_connection.getConnection ();
115- auto result = static_cast <redisReply*>(redisCommandArgv (connection->get (), argc, argv, argvlen));
116- if (connection->get ()->err != 0 ) {
117- vinfolog (" Redis connection error %s" , connection->get ()->errstr );
118- }
119- return result;
120- }
121-
122- RedisClient::PipelineExecutor::PipelineExecutor (const std::string& url, uint32_t pipelineInterval) :
123- d_connection(url), d_interval(pipelineInterval)
124- {
125- auto [sender, receiver] = pdns::channel::createObjectQueue<PipelineCommand>();
126- d_pipelineSender = std::move (sender);
127- d_pipelineReceiver = std::move (receiver);
128-
129- d_thread = std::thread (&RedisClient::PipelineExecutor::maintenanceThread, this );
130- }
131-
132- redisReply* RedisClient::PipelineExecutor::executeCommandArgv (int argc, const char ** argv, const size_t * argvlen) const
133- {
134- char * command;
135- auto len = redisFormatCommandArgv (&command, argc, argv, argvlen);
136- if (len < 0 ) {
137- // TODO: handle formatting errors?
138- vinfolog (" Redis command formatting error" );
139- return nullptr ;
140- }
141-
142- return pipelineCommand (command, len);
143- }
144-
145- redisReply* RedisClient::PipelineExecutor::executeCommand (const char * format, va_list ap) const
146- {
147- char * command;
148- auto len = redisvFormatCommand (&command, format, ap);
149- if (len < 0 ) {
150- // TODO: handle formatting errors?
151- vinfolog (" Redis command formatting error" );
152- return nullptr ;
153- }
154-
155- return pipelineCommand (command, len);
156- }
157-
158- redisReply* RedisClient::PipelineExecutor::pipelineCommand (const char * command, size_t len) const
159- {
160- std::mutex mtx;
161- std::condition_variable cv;
162- std::unique_lock<std::mutex> lock (mtx);
163- std::shared_ptr<redisReply*> result = std::make_shared<redisReply*>(nullptr );
164- PipelineCommand::callback_t callback = [result, &cv](redisReply* reply) mutable {
165- *result = reply;
166- cv.notify_one ();
167- };
168- d_pipelineSender.send (std::make_unique<PipelineCommand>(PipelineCommand{
169- command,
170- len,
171- callback}));
172- cv.wait (lock);
173- return *result;
174- }
175-
176- void RedisClient::PipelineExecutor::maintenanceThread ()
177- {
178- setThreadName (" dnsdist/redis" );
179-
180- for (;;) {
181- if (d_exiting) {
182- break ;
183- }
184-
185- bool connected = true ;
186- if (d_connection.needsReconnect ()) {
187- connected = d_connection.reconnect ();
188- }
189-
190- if (connected) {
191- auto connection = d_connection.getConnection ();
192- std::list<PipelineCommand::callback_t > callbacks;
193- while (auto command = d_pipelineReceiver.receive ()) {
194- if (redisAppendFormattedCommand (connection->get (), command->get ()->command , command->get ()->length ) == REDIS_OK ) {
195- callbacks.push_back (command->get ()->callback );
196- }
197- else {
198- if (connection->get ()->err != 0 ) {
199- vinfolog (" Redis connection error %s" , connection->get ()->errstr );
200- }
201- else {
202- vinfolog (" Unknown redis connection error" );
203- }
204- command->get ()->callback (nullptr );
205- }
206- }
207-
208- for (auto callback : callbacks) {
209- void * reply;
210- if (redisGetReply (connection->get (), &reply) == REDIS_OK ) {
211- callback (static_cast <redisReply*>(reply));
212- }
213- else {
214- if (connection->get ()->err != 0 ) {
215- vinfolog (" Redis connection error %s" , connection->get ()->errstr );
216- }
217- else {
218- vinfolog (" Unknown redis connection error" );
219- }
220- callback (nullptr );
221- }
222- }
223- }
224-
225- std::this_thread::sleep_for (std::chrono::milliseconds (d_interval));
226- }
227109};
228110
229- RedisClient::PipelineExecutor::~PipelineExecutor ()
230- {
231- d_exiting = true ;
232-
233- d_thread.join ();
234- }
235-
236111bool RedisKVClient::getValue (const std::string& key, std::string& value)
237112{
238113 auto reply = d_lookupAction->getValue (*d_client, key);
0 commit comments