Skip to content

Commit b6bc43b

Browse files
committed
feat: jdbc log rolling
1 parent 5e87b68 commit b6bc43b

File tree

2 files changed

+451
-6
lines changed

2 files changed

+451
-6
lines changed

connectors-common/connector-core/src/main/java/io/tapdata/write/FileLogger.java

Lines changed: 255 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import java.util.concurrent.*;
1414
import java.util.concurrent.atomic.AtomicBoolean;
1515
import java.util.concurrent.atomic.AtomicLong;
16+
import java.util.stream.Stream;
17+
import java.util.zip.GZIPOutputStream;
1618

1719
/**
1820
* High-performance file logger for string messages with multi-threaded write support.
@@ -46,7 +48,10 @@ public class FileLogger implements AutoCloseable {
4648
private static final int DEFAULT_FLUSH_INTERVAL_MS = 1000;
4749
private static final int DEFAULT_MAX_FILE_SIZE_MB = 100;
4850
private static final String LOG_FILE_EXTENSION = ".log";
51+
private static final String COMPRESSED_EXTENSION = ".gz";
4952
private static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
53+
private static final long DEFAULT_COMPRESS_INTERVAL_MS = 24 * 60 * 60 * 1000L; // 24 hours
54+
private static final int DEFAULT_RETAIN_DAYS = 7; // Keep compressed files for 7 days
5055

5156
// Instance fields
5257
private final String logDirectory;
@@ -56,14 +61,19 @@ public class FileLogger implements AutoCloseable {
5661
private final int flushIntervalMs;
5762
private final long maxFileSizeBytes;
5863
private final boolean autoTimestamp;
64+
private final boolean enableCompression;
65+
private final long compressIntervalMs;
66+
private final int retainDays;
5967

6068
// Runtime state
6169
private final BlockingQueue<LogEntry> logQueue;
6270
private final AtomicBoolean running;
6371
private final AtomicBoolean closed;
6472
private final AtomicLong totalLinesLogged;
6573
private final AtomicLong totalLinesDropped;
74+
private final AtomicLong totalFilesCompressed;
6675
private final Thread writerThread;
76+
private final ScheduledExecutorService compressExecutor;
6777
private final ThreadLocal<SimpleDateFormat> dateFormat;
6878

6979
// File handling
@@ -96,6 +106,9 @@ public static class Builder {
96106
private int flushIntervalMs = DEFAULT_FLUSH_INTERVAL_MS;
97107
private int maxFileSizeMB = DEFAULT_MAX_FILE_SIZE_MB;
98108
private boolean autoTimestamp = true;
109+
private boolean enableCompression = false;
110+
private long compressIntervalMs = DEFAULT_COMPRESS_INTERVAL_MS;
111+
private int retainDays = DEFAULT_RETAIN_DAYS;
99112

100113
public Builder logDirectory(String logDirectory) {
101114
this.logDirectory = logDirectory;
@@ -132,6 +145,46 @@ public Builder autoTimestamp(boolean autoTimestamp) {
132145
return this;
133146
}
134147

148+
/**
149+
* Enable automatic compression of old log files
150+
* @param enable true to enable compression
151+
* @return this builder
152+
*/
153+
public Builder enableCompression(boolean enable) {
154+
this.enableCompression = enable;
155+
return this;
156+
}
157+
158+
/**
159+
* Set compression interval in milliseconds
160+
* @param intervalMs interval in milliseconds (default: 24 hours)
161+
* @return this builder
162+
*/
163+
public Builder compressIntervalMs(long intervalMs) {
164+
this.compressIntervalMs = intervalMs;
165+
return this;
166+
}
167+
168+
/**
169+
* Set compression interval in hours
170+
* @param hours interval in hours
171+
* @return this builder
172+
*/
173+
public Builder compressIntervalHours(int hours) {
174+
this.compressIntervalMs = hours * 60 * 60 * 1000L;
175+
return this;
176+
}
177+
178+
/**
179+
* Set how many days to retain compressed files
180+
* @param days number of days to retain (default: 7)
181+
* @return this builder
182+
*/
183+
public Builder retainDays(int days) {
184+
this.retainDays = days;
185+
return this;
186+
}
187+
135188
public FileLogger build() throws IOException {
136189
return new FileLogger(this);
137190
}
@@ -155,12 +208,16 @@ private FileLogger(Builder builder) throws IOException {
155208
this.flushIntervalMs = builder.flushIntervalMs;
156209
this.maxFileSizeBytes = builder.maxFileSizeMB * 1024L * 1024L;
157210
this.autoTimestamp = builder.autoTimestamp;
211+
this.enableCompression = builder.enableCompression;
212+
this.compressIntervalMs = builder.compressIntervalMs;
213+
this.retainDays = builder.retainDays;
158214

159215
this.logQueue = new LinkedBlockingQueue<>(queueCapacity);
160216
this.running = new AtomicBoolean(false);
161217
this.closed = new AtomicBoolean(false);
162218
this.totalLinesLogged = new AtomicLong(0);
163219
this.totalLinesDropped = new AtomicLong(0);
220+
this.totalFilesCompressed = new AtomicLong(0);
164221
this.fileSequence = 0;
165222

166223
this.dateFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat(DATE_PATTERN));
@@ -180,8 +237,27 @@ private FileLogger(Builder builder) throws IOException {
180237
this.running.set(true);
181238
this.writerThread.start();
182239

183-
TapLogger.info(TAG, "FileLogger initialized: directory={}, prefix={}, queueCapacity={}, batchSize={}, flushIntervalMs={}, maxFileSizeMB={}",
184-
logDirectory, logFilePrefix, queueCapacity, batchSize, flushIntervalMs, builder.maxFileSizeMB);
240+
// Start compression scheduler if enabled
241+
if (enableCompression) {
242+
this.compressExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
243+
Thread t = new Thread(r, "FileLogger-Compressor-" + logFilePrefix);
244+
t.setDaemon(true);
245+
return t;
246+
});
247+
this.compressExecutor.scheduleAtFixedRate(
248+
this::compressOldLogs,
249+
compressIntervalMs,
250+
compressIntervalMs,
251+
TimeUnit.MILLISECONDS
252+
);
253+
TapLogger.info(TAG, "FileLogger compression enabled: interval={}ms, retainDays={}",
254+
compressIntervalMs, retainDays);
255+
} else {
256+
this.compressExecutor = null;
257+
}
258+
259+
TapLogger.info(TAG, "FileLogger initialized: directory={}, prefix={}, queueCapacity={}, batchSize={}, flushIntervalMs={}, maxFileSizeMB={}, compression={}",
260+
logDirectory, logFilePrefix, queueCapacity, batchSize, flushIntervalMs, builder.maxFileSizeMB, enableCompression);
185261
}
186262

187263
/**
@@ -371,6 +447,158 @@ public void forceFlush() {
371447
flush();
372448
}
373449

450+
/**
451+
* Compress old log files and delete expired compressed files
452+
* This method is called periodically by the compression scheduler
453+
*/
454+
private void compressOldLogs() {
455+
compressOldLogs(false);
456+
}
457+
458+
/**
459+
* Compress old log files and delete expired compressed files
460+
* @param forceAll if true, compress all non-current log files regardless of age
461+
*/
462+
private void compressOldLogs(boolean forceAll) {
463+
try {
464+
Path logDir = Paths.get(logDirectory);
465+
if (!Files.exists(logDir)) {
466+
return;
467+
}
468+
469+
long now = System.currentTimeMillis();
470+
long compressThreshold = now - compressIntervalMs;
471+
long deleteThreshold = now - (retainDays * 24L * 60L * 60L * 1000L);
472+
473+
try (Stream<Path> files = Files.list(logDir)) {
474+
files.filter(path -> {
475+
String fileName = path.getFileName().toString();
476+
// Only process log files with our prefix
477+
return fileName.startsWith(logFilePrefix) &&
478+
fileName.endsWith(LOG_FILE_EXTENSION) &&
479+
!path.equals(currentLogFile); // Don't compress current file
480+
})
481+
.forEach(logFile -> {
482+
try {
483+
long lastModified = Files.getLastModifiedTime(logFile).toMillis();
484+
485+
// Compress old log files
486+
if (forceAll || lastModified < compressThreshold) {
487+
compressLogFile(logFile);
488+
}
489+
} catch (Exception e) {
490+
TapLogger.error(TAG, "Error processing log file {}: {}",
491+
logFile.getFileName(), e.getMessage(), e);
492+
}
493+
});
494+
}
495+
496+
// Delete expired compressed files
497+
try (Stream<Path> files = Files.list(logDir)) {
498+
files.filter(path -> {
499+
String fileName = path.getFileName().toString();
500+
return fileName.startsWith(logFilePrefix) &&
501+
fileName.endsWith(COMPRESSED_EXTENSION);
502+
})
503+
.forEach(compressedFile -> {
504+
try {
505+
long lastModified = Files.getLastModifiedTime(compressedFile).toMillis();
506+
507+
if (lastModified < deleteThreshold) {
508+
Files.delete(compressedFile);
509+
TapLogger.info(TAG, "Deleted expired compressed file: {}",
510+
compressedFile.getFileName());
511+
}
512+
} catch (Exception e) {
513+
TapLogger.error(TAG, "Error deleting compressed file {}: {}",
514+
compressedFile.getFileName(), e.getMessage(), e);
515+
}
516+
});
517+
}
518+
519+
} catch (Exception e) {
520+
TapLogger.error(TAG, "Error in compressOldLogs: {}", e.getMessage(), e);
521+
}
522+
}
523+
524+
/**
525+
* Compress a single log file using GZIP
526+
*/
527+
private void compressLogFile(Path logFile) {
528+
Path compressedFile = Paths.get(logFile.toString() + COMPRESSED_EXTENSION);
529+
530+
try {
531+
// Skip if already compressed
532+
if (Files.exists(compressedFile)) {
533+
TapLogger.debug(TAG, "Compressed file already exists: {}", compressedFile.getFileName());
534+
return;
535+
}
536+
537+
long originalSize = Files.size(logFile);
538+
539+
// Compress the file
540+
try (InputStream in = Files.newInputStream(logFile);
541+
OutputStream out = Files.newOutputStream(compressedFile);
542+
GZIPOutputStream gzipOut = new GZIPOutputStream(out)) {
543+
544+
byte[] buffer = new byte[8192];
545+
int len;
546+
while ((len = in.read(buffer)) > 0) {
547+
gzipOut.write(buffer, 0, len);
548+
}
549+
}
550+
551+
long compressedSize = Files.size(compressedFile);
552+
double ratio = (1.0 - (double) compressedSize / originalSize) * 100;
553+
554+
// Delete original file after successful compression
555+
Files.delete(logFile);
556+
557+
totalFilesCompressed.incrementAndGet();
558+
559+
TapLogger.info(TAG, "Compressed log file: {} -> {} (saved {:.1f}%, {} -> {} bytes)",
560+
logFile.getFileName(), compressedFile.getFileName(),
561+
String.format("%.1f", ratio), originalSize, compressedSize);
562+
563+
} catch (Exception e) {
564+
TapLogger.error(TAG, "Error compressing log file {}: {}",
565+
logFile.getFileName(), e.getMessage(), e);
566+
567+
// Clean up partial compressed file on error
568+
try {
569+
if (Files.exists(compressedFile)) {
570+
Files.delete(compressedFile);
571+
}
572+
} catch (IOException cleanupError) {
573+
TapLogger.error(TAG, "Error cleaning up partial compressed file: {}",
574+
cleanupError.getMessage());
575+
}
576+
}
577+
}
578+
579+
/**
580+
* Manually trigger compression of old log files
581+
* This can be called by users to compress logs on demand
582+
* Only compresses files older than the compression interval
583+
*/
584+
public void compressNow() {
585+
compressNow(false);
586+
}
587+
588+
/**
589+
* Manually trigger compression of log files
590+
* @param forceAll if true, compress all non-current log files regardless of age
591+
*/
592+
public void compressNow(boolean forceAll) {
593+
if (!enableCompression) {
594+
TapLogger.warn(TAG, "Compression is not enabled");
595+
return;
596+
}
597+
598+
TapLogger.info(TAG, "Manual compression triggered (forceAll={})", forceAll);
599+
compressOldLogs(forceAll);
600+
}
601+
374602
/**
375603
* Rotate to a new log file
376604
*/
@@ -411,6 +639,7 @@ public LoggerStats getStats() {
411639
return new LoggerStats(
412640
totalLinesLogged.get(),
413641
totalLinesDropped.get(),
642+
totalFilesCompressed.get(),
414643
logQueue.size(),
415644
currentFileSize,
416645
currentLogFile != null ? currentLogFile.toString() : null
@@ -423,14 +652,16 @@ public LoggerStats getStats() {
423652
public static class LoggerStats {
424653
private final long totalLinesLogged;
425654
private final long totalLinesDropped;
655+
private final long totalFilesCompressed;
426656
private final int queueSize;
427657
private final long currentFileSize;
428658
private final String currentLogFile;
429659

430-
public LoggerStats(long totalLinesLogged, long totalLinesDropped, int queueSize,
431-
long currentFileSize, String currentLogFile) {
660+
public LoggerStats(long totalLinesLogged, long totalLinesDropped, long totalFilesCompressed,
661+
int queueSize, long currentFileSize, String currentLogFile) {
432662
this.totalLinesLogged = totalLinesLogged;
433663
this.totalLinesDropped = totalLinesDropped;
664+
this.totalFilesCompressed = totalFilesCompressed;
434665
this.queueSize = queueSize;
435666
this.currentFileSize = currentFileSize;
436667
this.currentLogFile = currentLogFile;
@@ -444,6 +675,10 @@ public long getTotalLinesDropped() {
444675
return totalLinesDropped;
445676
}
446677

678+
public long getTotalFilesCompressed() {
679+
return totalFilesCompressed;
680+
}
681+
447682
public int getQueueSize() {
448683
return queueSize;
449684
}
@@ -458,8 +693,8 @@ public String getCurrentLogFile() {
458693

459694
@Override
460695
public String toString() {
461-
return String.format("LoggerStats{logged=%d, dropped=%d, queueSize=%d, fileSize=%d, file=%s}",
462-
totalLinesLogged, totalLinesDropped, queueSize, currentFileSize, currentLogFile);
696+
return String.format("LoggerStats{logged=%d, dropped=%d, compressed=%d, queueSize=%d, fileSize=%d, file=%s}",
697+
totalLinesLogged, totalLinesDropped, totalFilesCompressed, queueSize, currentFileSize, currentLogFile);
463698
}
464699
}
465700

@@ -471,6 +706,20 @@ public void close() {
471706
if (closed.compareAndSet(false, true)) {
472707
TapLogger.info(TAG, "Closing FileLogger...");
473708

709+
// Stop the compression executor if enabled
710+
if (compressExecutor != null) {
711+
try {
712+
compressExecutor.shutdown();
713+
if (!compressExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
714+
compressExecutor.shutdownNow();
715+
}
716+
} catch (InterruptedException e) {
717+
Thread.currentThread().interrupt();
718+
compressExecutor.shutdownNow();
719+
TapLogger.warn(TAG, "Interrupted while waiting for compression executor to finish");
720+
}
721+
}
722+
474723
// Stop the writer thread
475724
running.set(false);
476725

0 commit comments

Comments
 (0)