Skip to content

Commit b64f2e5

Browse files
authored
Merge pull request #5389 from sysown/v3.0-5243
v3.0: harden query logging buffering follow-up (#5364 / #5243)
2 parents 9e157d2 + 5d78652 commit b64f2e5

15 files changed

+1662
-205
lines changed

include/MySQL_Logger.hpp

Lines changed: 77 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
#include "proxysql.h"
44
#include "cpp.h"
55
#include <atomic>
6+
#include <memory>
7+
#include <mutex>
8+
#include <unordered_map>
69

710
#ifndef PROXYJSON
811
#define PROXYJSON
@@ -11,6 +14,8 @@
1114

1215
#define PROXYSQL_LOGGER_PTHREAD_MUTEX
1316

17+
class LogBuffer;
18+
class LogBufferThreadContext;
1419
class MySQL_Logger;
1520

1621
struct p_ml_counter {
@@ -118,50 +123,48 @@ class MySQL_Event {
118123
~MySQL_Event();
119124

120125
/**
121-
* @brief Writes the event data to a file stream.
122-
* @param f A pointer to the file stream.
123-
* @param sess A pointer to the MySQL_Session object.
124-
* @return The total number of bytes written.
125-
*
126-
* This function writes the event data to the specified file stream based on the event type and the configured log format.
127-
*/
128-
uint64_t write(std::fstream* f, MySQL_Session* sess);
129-
130-
/**
131-
* @brief Writes the event data in binary format (format 1) to a file stream.
132-
* @param f A pointer to the file stream to write to. Must not be NULL.
133-
* @return The total number of bytes written to the stream.
134-
*
135-
* This function serializes the event data into a binary format according to the MySQL event log format 1 specification.
136-
* It encodes lengths using MySQL's length encoding scheme.
137-
* The function writes the event type, thread ID, username, schema name, client address, hostgroup ID (if available), server address (if available), timestamps, client statement ID (if applicable), affected rows, last insert ID, rows sent, query digest, and query string to the file stream.
138-
* The function writes all fields as defined by the MySQL event log format.
139-
* It handles variable-length fields using MySQL's length encoding, which means that the length of each field is written before the field data itself.
140-
* The function carefully handles potential errors during file writing operations.
141-
*/
142-
uint64_t write_query_format_1(std::fstream* f);
143-
144-
145-
/**
146-
* @brief Writes the event data in JSON format (format 2) to a file stream.
147-
* @param f A pointer to the file stream to write to. Must not be NULL.
148-
* @return The total number of bytes written to the stream (always 0 in the current implementation).
126+
* @brief Writes the event data to a LogBuffer.
127+
* @param f A pointer to LogBuffer to write to.
128+
* @param sess A pointer to the MySQL_Session object.
129+
* @return The total number of bytes written.
130+
*
131+
* This function writes the event data to the specified LogBuffer based on the event type and the configured log format.
132+
*/
133+
uint64_t write(LogBuffer* f, MySQL_Session* sess);
134+
135+
/**
136+
* @brief Writes the event data in binary format (format 1) to a LogBuffer.
137+
* @param f A pointer to the LogBuffer to write to.
138+
* @return The total number of bytes written.
139+
*
140+
* This function serializes the event data into a binary format according to the MySQL event log format 1 specification.
141+
* It encodes lengths using MySQL's length encoding scheme.
142+
* The function writes the event type, thread ID, username, schema name, client address, hostgroup ID (if available), server address (if available), timestamps, client statement ID (if applicable), affected rows, last insert ID, rows sent, query digest, and query string to the LogBuffer.
143+
* The function writes all fields as defined by the MySQL event log format.
144+
* It handles variable-length fields using MySQL's length encoding, which means that the length of each field is written before the field data itself.
145+
*/
146+
uint64_t write_query_format_1(LogBuffer* f);
147+
148+
/**
149+
* @brief Writes the event data in JSON format (format 2) to a LogBuffer.
150+
* @param f A pointer to the LogBuffer to write to. Must not be NULL.
151+
* @return The total number of bytes written (always 0 in the current implementation).
149152
*
150153
* This function serializes the event data into a JSON format.
151154
* It converts various data fields into a JSON object and writes this object to the file stream.
152155
* The function uses the nlohmann::json library for JSON serialization.
153156
* This function currently always returns 0.
154157
* The function constructs a JSON object containing relevant event information such as the hostgroup ID, thread ID, event type, username, schema name, client and server addresses, affected rows, last insert ID, rows sent, query string, timestamps, query digest, and client statement ID (if applicable).
155-
* After constructing the JSON object, it serializes it into a string using the `dump()` method of the nlohmann::json library and writes the resulting string to the output file stream.
158+
* After constructing the JSON object, it serializes it into a string using the `dump()` method of the nlohmann::json library and writes the resulting string to the LogBuffer.
156159
*/
157-
uint64_t write_query_format_2_json(std::fstream* f);
160+
uint64_t write_query_format_2_json(LogBuffer* f);
158161

159162
/**
160-
* @brief Writes authentication-related event data to a file stream.
161-
* @param f A pointer to the file stream.
163+
* @brief Writes authentication-related event data to a LogBuffer.
164+
* @param f A pointer to the LogBuffer to write to.
162165
* @param sess A pointer to the MySQL_Session object.
163166
*/
164-
void write_auth(std::fstream* f, MySQL_Session* sess);
167+
void write_auth(LogBuffer* f, MySQL_Session* sess);
165168

166169
/**
167170
* @brief Sets the client statement ID for the event.
@@ -333,8 +336,10 @@ class MySQL_Logger {
333336
char* base_filename; ///< Base filename for event log files. Memory managed by the class.
334337
char* datadir; ///< Directory for event log files. Memory managed by the class.
335338
unsigned int log_file_id; ///< ID of the current event log file.
339+
unsigned int current_log_size; ///< Current size of an event log file in bytes.
336340
unsigned int max_log_file_size; ///< Maximum size of an event log file in bytes.
337341
std::fstream* logfile; ///< File stream for event logging.
342+
std::atomic<bool> logfile_open{false}; ///< Atomic flag indicating if the logfile is currently open.
338343
} events;
339344

340345
/**
@@ -345,8 +350,10 @@ class MySQL_Logger {
345350
char* base_filename; ///< Base filename for audit log files. Memory managed by the class.
346351
char* datadir; ///< Directory for audit log files. Memory managed by the class.
347352
unsigned int log_file_id; ///< ID of the current audit log file.
353+
unsigned int current_log_size; ///< Current size of an audit log file in bytes.
348354
unsigned int max_log_file_size; ///< Maximum size of an audit log file in bytes.
349355
std::fstream* logfile; ///< File stream for audit logging.
356+
std::atomic<bool> logfile_open{false}; ///< Atomic flag indicating if the logfile is currently open.
350357
} audit;
351358

352359
/**
@@ -378,6 +385,7 @@ class MySQL_Logger {
378385
std::atomic<unsigned long long> totalEventsCopiedToDisk;
379386
std::atomic<unsigned long long> eventsAddedToBufferCount; ///< Total number of events added to the buffer.
380387
std::atomic<unsigned long long> eventsCurrentlyInBufferCount; ///< Number of events currently in the buffer.
388+
std::atomic<unsigned long long> totalQueriesLogged; ///< Total number of queries accepted by events logger.
381389
} metrics;
382390

383391
/**
@@ -386,6 +394,7 @@ class MySQL_Logger {
386394
struct {
387395
std::array<prometheus::Counter*, p_ml_counter::__size> p_counter_array {};
388396
std::array<prometheus::Gauge*, p_ml_gauge::__size> p_gauge_array {};
397+
prometheus::Counter* p_queries_logged_total { nullptr };
389398
} prom_metrics;
390399

391400
// Mutex or rwlock for thread safety
@@ -395,6 +404,40 @@ class MySQL_Logger {
395404
rwlock_t rwlock; ///< rwlock for thread safety.
396405
#endif
397406

407+
// Map to store per-thread logging contexts (one context per thread handles both events and audit)
408+
std::unordered_map<pthread_t, std::unique_ptr<LogBufferThreadContext>> log_thread_contexts;
409+
std::mutex log_thread_contexts_lock; ///< Mutex to protect the context map
410+
411+
/**
412+
* @brief Retrieves the logging context for the current thread.
413+
* @return Pointer to the thread's LogBufferThreadContext containing both event and audit state.
414+
*/
415+
LogBufferThreadContext* get_log_thread_context();
416+
417+
/**
418+
* @brief Checks if the events logfile is open.
419+
* @return True if the logfile is open, false otherwise.
420+
*/
421+
bool is_events_logfile_open() const;
422+
423+
/**
424+
* @brief Sets the open state of the events logfile.
425+
* @param open The new state (true for open, false for closed).
426+
*/
427+
void set_events_logfile_open(bool open);
428+
429+
/**
430+
* @brief Checks if the audit logfile is open.
431+
* @return True if the logfile is open, false otherwise.
432+
*/
433+
bool is_audit_logfile_open() const;
434+
435+
/**
436+
* @brief Sets the open state of the audit logfile.
437+
* @param open The new state (true for open, false for closed).
438+
*/
439+
void set_audit_logfile_open(bool open);
440+
398441
/**
399442
* @brief Closes the event log file. This function should only be called while holding the write lock.
400443
*/
@@ -513,7 +556,7 @@ class MySQL_Logger {
513556
/**
514557
* @brief Flushes the log files.
515558
*/
516-
void flush();
559+
void flush(bool force = false);
517560

518561
/**
519562
* @brief Acquires a write lock.

include/MySQL_Thread.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,8 +550,13 @@ class MySQL_Threads_Handler
550550
int eventslog_default_log;
551551
int eventslog_format;
552552
int eventslog_stmt_parameters;
553+
int eventslog_flush_timeout;
554+
int eventslog_flush_size;
555+
int eventslog_rate_limit;
553556
char *auditlog_filename;
554557
int auditlog_filesize;
558+
int auditlog_flush_timeout;
559+
int auditlog_flush_size;
555560
// SSL related, proxy to server
556561
char * ssl_p2s_ca;
557562
char * ssl_p2s_capath;

include/PgSQL_Logger.hpp

Lines changed: 64 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@
55
#include <atomic>
66
#include <array>
77
#include <deque>
8+
#include <memory>
89
#include <mutex>
910
#include <string>
1011
#include <unordered_map>
1112
#include <vector>
1213

14+
class LogBuffer;
15+
class LogBufferThreadContext;
16+
1317
#define PROXYSQL_LOGGER_PTHREAD_MUTEX
1418

1519
/**
@@ -112,43 +116,43 @@ class PgSQL_Event {
112116
bool free_error_on_delete;
113117

114118
public:
115-
/**
116-
* @brief Builds an event object using session/query context.
117-
*/
118-
PgSQL_Event(PGSQL_LOG_EVENT_TYPE _et, uint32_t _thread_id, char * _username, char * _schemaname , uint64_t _start_time , uint64_t _end_time , uint64_t _query_digest, char *_client, size_t _client_len);
119-
/**
120-
* @brief Deep-copy constructor used by circular buffer insertion.
121-
*/
122-
PgSQL_Event(const PgSQL_Event& other);
123-
/**
124-
* @brief Copy assignment is disabled to prevent shallow pointer ownership bugs.
125-
*/
126-
PgSQL_Event& operator=(const PgSQL_Event&) = delete;
127-
/**
128-
* @brief Frees event-owned allocations for deep-copied instances.
129-
*/
130-
~PgSQL_Event();
131-
/**
132-
* @brief Serializes this event into the configured events/audit file format.
133-
* @return Number of bytes written for binary format, 0 for JSON format.
134-
*/
135-
uint64_t write(std::fstream *f, PgSQL_Session *sess);
136-
/**
137-
* @brief Writes binary format payload for query events.
138-
*/
139-
uint64_t write_query_format_1(std::fstream *f);
140-
/**
141-
* @brief Writes JSON format payload for query events.
142-
*/
143-
uint64_t write_query_format_2_json(std::fstream *f);
144-
/**
145-
* @brief Writes JSON payload for authentication/audit events.
146-
*/
147-
void write_auth(std::fstream *f, PgSQL_Session *sess);
148-
/**
149-
* @brief Sets client prepared statement name associated with this event.
150-
*/
151-
void set_client_stmt_name(char* client_stmt_name);
119+
/**
120+
* @brief Builds an event object using session/query context.
121+
*/
122+
PgSQL_Event(PGSQL_LOG_EVENT_TYPE _et, uint32_t _thread_id, char * _username, char * _schemaname , uint64_t _start_time , uint64_t _end_time , uint64_t _query_digest, char *_client, size_t _client_len);
123+
/**
124+
* @brief Deep-copy constructor used by circular buffer insertion.
125+
*/
126+
PgSQL_Event(const PgSQL_Event& other);
127+
/**
128+
* @brief Copy assignment is disabled to prevent shallow pointer ownership bugs.
129+
*/
130+
PgSQL_Event& operator=(const PgSQL_Event&) = delete;
131+
/**
132+
* @brief Frees event-owned allocations for deep-copied instances.
133+
*/
134+
~PgSQL_Event();
135+
/**
136+
* @brief Serializes this event into the configured events/audit file format.
137+
* @return Number of bytes written for binary format, 0 for JSON format.
138+
*/
139+
uint64_t write(LogBuffer *f, PgSQL_Session *sess);
140+
/**
141+
* @brief Writes binary format payload for query events.
142+
*/
143+
uint64_t write_query_format_1(LogBuffer *f);
144+
/**
145+
* @brief Writes JSON format payload for query events.
146+
*/
147+
uint64_t write_query_format_2_json(LogBuffer *f);
148+
/**
149+
* @brief Writes JSON payload for authentication/audit events.
150+
*/
151+
void write_auth(LogBuffer *f, PgSQL_Session *sess);
152+
/**
153+
* @brief Sets client prepared statement name associated with this event.
154+
*/
155+
void set_client_stmt_name(char* client_stmt_name);
152156
/**
153157
* @brief Sets event query text and effective query length.
154158
*/
@@ -235,21 +239,24 @@ class PgSQL_Logger {
235239
char *base_filename;
236240
char *datadir;
237241
unsigned int log_file_id;
242+
unsigned int current_log_size;
238243
unsigned int max_log_file_size;
239244
std::fstream *logfile;
245+
std::atomic<bool> logfile_open{false};
240246
} events;
241247
struct {
242248
bool enabled;
243249
char *base_filename;
244250
char *datadir;
245251
unsigned int log_file_id;
252+
unsigned int current_log_size;
246253
unsigned int max_log_file_size;
247254
std::fstream *logfile;
255+
std::atomic<bool> logfile_open{false};
248256
} audit;
249-
250-
/**
251-
* @brief Accumulated runtime metrics for PostgreSQL advanced events logging.
252-
*/
257+
/**
258+
* @brief Accumulated runtime metrics for PostgreSQL advanced events logging.
259+
*/
253260
struct EventLogMetrics {
254261
std::atomic<unsigned long long> memoryCopyCount;
255262
std::atomic<unsigned long long> diskCopyCount;
@@ -262,16 +269,25 @@ class PgSQL_Logger {
262269
std::atomic<unsigned long long> totalEventsCopiedToDisk;
263270
} metrics;
264271

265-
struct {
266-
std::array<prometheus::Counter*, p_pl_counter::__size> p_counter_array {};
267-
std::array<prometheus::Gauge*, p_pl_gauge::__size> p_gauge_array {};
268-
} prom_metrics;
269-
272+
struct {
273+
std::atomic<unsigned long long> total_queries_logged { 0 };
274+
prometheus::Counter* p_queries_logged_total { nullptr };
275+
std::array<prometheus::Counter*, p_pl_counter::__size> p_counter_array {};
276+
std::array<prometheus::Gauge*, p_pl_gauge::__size> p_gauge_array {};
277+
} prom_metrics;
270278
#ifdef PROXYSQL_LOGGER_PTHREAD_MUTEX
271279
pthread_mutex_t wmutex;
272280
#else
273281
rwlock_t rwlock;
274282
#endif
283+
std::unordered_map<pthread_t, std::unique_ptr<LogBufferThreadContext>> log_thread_contexts;
284+
std::mutex log_thread_contexts_lock;
285+
286+
LogBufferThreadContext* get_log_thread_context();
287+
bool is_events_logfile_open() const;
288+
void set_events_logfile_open(bool open);
289+
bool is_audit_logfile_open() const;
290+
void set_audit_logfile_open(bool open);
275291
void events_close_log_unlocked();
276292
void events_open_log_unlocked();
277293
void audit_close_log_unlocked();
@@ -297,7 +313,8 @@ class PgSQL_Logger {
297313
void audit_set_base_filename();
298314
void log_request(PgSQL_Session *, PgSQL_Data_Stream *);
299315
void log_audit_entry(PGSQL_LOG_EVENT_TYPE, PgSQL_Session *, PgSQL_Data_Stream *, char *e = NULL);
300-
void flush();
316+
void flush(bool force = false);
317+
void p_update_metrics();
301318
void wrlock();
302319
void wrunlock();
303320
PgSQL_Logger_CircularBuffer* PgLogCB;
@@ -318,10 +335,6 @@ class PgSQL_Logger {
318335
*/
319336
std::unordered_map<std::string, unsigned long long> getAllMetrics() const;
320337

321-
/**
322-
* @brief Prometheus serial exposer update hook for PostgreSQL logger metrics.
323-
*/
324-
void p_update_metrics();
325338
};
326339

327340
#endif /* __CLASS_PGSQL_LOGGER_H */

include/PgSQL_Thread.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,8 +1034,13 @@ class PgSQL_Threads_Handler
10341034
int eventslog_buffer_max_query_length;
10351035
int eventslog_default_log;
10361036
int eventslog_format;
1037+
int eventslog_flush_timeout;
1038+
int eventslog_flush_size;
1039+
int eventslog_rate_limit;
10371040
char* auditlog_filename;
10381041
int auditlog_filesize;
1042+
int auditlog_flush_timeout;
1043+
int auditlog_flush_size;
10391044
// SSL related, proxy to server
10401045
char* ssl_p2s_ca;
10411046
char* ssl_p2s_capath;

0 commit comments

Comments
 (0)