Skip to content

Commit 6eadc4d

Browse files
author
lmj
committed
Extend CLI with monitor/transform and task commands
1 parent 0f58784 commit 6eadc4d

10 files changed

Lines changed: 1436 additions & 12 deletions

File tree

cli/pom.xml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,28 @@
1515
<name>DB Syncer CLI</name>
1616
<description>Command-line interface tool</description>
1717

18-
<dependencies>
18+
<dependencies>
1919
<!-- Internal -->
2020
<dependency>
2121
<groupId>com.dbsyncer</groupId>
2222
<artifactId>common</artifactId>
2323
<version>${project.version}</version>
2424
</dependency>
25+
<dependency>
26+
<groupId>org.apache.kafka</groupId>
27+
<artifactId>connect-json</artifactId>
28+
<version>${kafka.version}</version>
29+
</dependency>
2530
<dependency>
2631
<groupId>com.dbsyncer</groupId>
2732
<artifactId>metadata-service</artifactId>
2833
<version>${project.version}</version>
2934
</dependency>
35+
<dependency>
36+
<groupId>com.dbsyncer</groupId>
37+
<artifactId>transformations</artifactId>
38+
<version>${project.version}</version>
39+
</dependency>
3040

3141
<!-- Picocli -->
3242
<dependency>

cli/src/main/java/com/dbsyncer/cli/command/ConfigCommand.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,28 @@ public static class GenerateCommand implements Runnable {
142142
@Option(names = {"-o", "--output"}, description = "Output file path")
143143
private String outputFile;
144144

145+
@Option(names = {"--enable-type-mapping"}, description = "Enable ApplyTypeMapping SMT on source connector", defaultValue = "false")
146+
private boolean enableTypeMapping;
147+
148+
@Option(names = {"--type-mapping-source-db"}, description = "Source DB for type mapping (mysql, oracle, postgresql)")
149+
private String typeMappingSourceDb;
150+
151+
@Option(names = {"--type-mapping-enable-time"}, description = "Enable time logical mapping (default: true)", defaultValue = "true")
152+
private boolean typeMappingEnableTime;
153+
154+
@Option(names = {"--type-mapping-enable-json"}, description = "Enable JSON logical mapping (default: true)", defaultValue = "true")
155+
private boolean typeMappingEnableJson;
156+
145157
public GenerateCommand(CliConfigService configService) {
146158
this.configService = configService;
147159
}
148160

149161
@Override
150162
public void run() {
151163
try {
152-
String template = configService.generateConfigTemplate(taskIdentifier, connectorType);
164+
String template = configService.generateConfigTemplate(taskIdentifier, connectorType,
165+
enableTypeMapping, typeMappingSourceDb,
166+
typeMappingEnableTime, typeMappingEnableJson);
153167
if (outputFile != null) {
154168
java.nio.file.Files.writeString(java.nio.file.Path.of(outputFile), template);
155169
System.out.println("Configuration template written to: " + outputFile);

cli/src/main/java/com/dbsyncer/cli/command/DbSyncerCommand.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
version = "1.0.0-SNAPSHOT",
1313
subcommands = {
1414
TaskCommand.class,
15+
TransformCommand.class,
1516
StatusCommand.class,
16-
ConfigCommand.class
17+
ConfigCommand.class,
18+
MonitorCommand.class
1719
}
1820
)
1921
public class DbSyncerCommand implements Runnable {
@@ -32,6 +34,7 @@ public void run() {
3234
System.out.println(" task Manage migration tasks");
3335
System.out.println(" status Query task status and progress");
3436
System.out.println(" config Manage connector configurations");
37+
System.out.println(" monitor Monitor a running task");
3538
System.out.println();
3639
System.out.println("Use 'dbsyncer <command> --help' for more information about a command.");
3740
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package com.dbsyncer.cli.command;
2+
3+
import com.dbsyncer.cli.service.CliTaskService;
4+
import com.dbsyncer.metadata.dto.TaskResponse;
5+
import com.dbsyncer.metadata.entity.TableProgress;
6+
import org.springframework.stereotype.Component;
7+
import picocli.CommandLine.Command;
8+
import picocli.CommandLine.Option;
9+
import picocli.CommandLine.Parameters;
10+
11+
import java.time.Duration;
12+
import java.time.OffsetDateTime;
13+
import java.util.List;
14+
import java.util.UUID;
15+
16+
@Component
17+
@Command(
18+
name = "monitor",
19+
description = "Monitor a migration task with a terminal dashboard",
20+
mixinStandardHelpOptions = true
21+
)
22+
public class MonitorCommand implements Runnable {
23+
24+
private final CliTaskService taskService;
25+
26+
@Parameters(index = "0", description = "Task ID or name")
27+
private String taskIdentifier;
28+
29+
@Option(names = {"-i", "--interval"}, description = "Refresh interval in seconds", defaultValue = "2")
30+
private int refreshInterval;
31+
32+
public MonitorCommand(CliTaskService taskService) {
33+
this.taskService = taskService;
34+
}
35+
36+
@Override
37+
public void run() {
38+
try {
39+
UUID taskId = resolveTaskId(taskIdentifier);
40+
System.out.println("Monitoring task '" + taskIdentifier + "' (Ctrl+C to stop)...");
41+
while (true) {
42+
clearScreen();
43+
renderDashboard(taskId);
44+
Thread.sleep(refreshInterval * 1000L);
45+
}
46+
} catch (InterruptedException e) {
47+
Thread.currentThread().interrupt();
48+
System.out.println("\nMonitor stopped.");
49+
} catch (Exception e) {
50+
System.err.println("Error monitoring task: " + e.getMessage());
51+
System.exit(1);
52+
}
53+
}
54+
55+
private void renderDashboard(UUID taskId) {
56+
TaskResponse task = taskService.getTask(taskId.toString());
57+
List<TableProgress> tables = taskService.getTableProgress(taskId);
58+
Long totalProcessed = taskService.getTotalRowsProcessed(taskId);
59+
Long totalEstimated = taskService.getTotalEstimatedRows(taskId);
60+
Long etaSeconds = taskService.estimateEtaSeconds(taskId);
61+
Double avgLag = taskService.getAverageLag(taskId);
62+
63+
if (totalProcessed == null) {
64+
totalProcessed = 0L;
65+
}
66+
if (totalEstimated == null) {
67+
totalEstimated = 0L;
68+
}
69+
70+
double overallPercent = calculateOverallPercent(totalProcessed, totalEstimated, task);
71+
String overallBar = progressBar(overallPercent, 40);
72+
73+
System.out.println("=== Task Monitor ===");
74+
System.out.println("ID: " + task.getId());
75+
System.out.println("Name: " + task.getTaskName());
76+
System.out.println("Status: " + task.getStatus());
77+
System.out.println();
78+
79+
System.out.printf("Progress: %s %5.1f%%%n", overallBar, overallPercent);
80+
System.out.printf("Tables: %d completed / %d total%n",
81+
task.getCompletedTables() != null ? task.getCompletedTables() : 0,
82+
task.getTotalTables() != null ? task.getTotalTables() : 0);
83+
System.out.printf("Records: %d processed / %d total%n",
84+
totalProcessed,
85+
totalEstimated > 0 ? totalEstimated : 0);
86+
87+
Double rate = calculateThroughput(task, totalProcessed);
88+
System.out.printf("Rate: %s records/s%n", rate != null ? String.format("%.1f", rate) : "N/A");
89+
System.out.printf("ETA: %s%n", etaSeconds != null ? formatDuration(etaSeconds) : "N/A");
90+
System.out.printf("Avg lag: %s ms%n", avgLag != null ? String.format("%.0f", avgLag) : "N/A");
91+
92+
System.out.println();
93+
System.out.println("--- Table Progress ---");
94+
if (tables.isEmpty()) {
95+
System.out.println("No table progress data available.");
96+
} else {
97+
System.out.printf("%-30s %-12s %-20s %-10s%n",
98+
"TABLE", "STATUS", "PROGRESS", "LAG (ms)");
99+
System.out.println("-".repeat(80));
100+
for (TableProgress tp : tables) {
101+
long est = tp.getEstimatedRows() != null ? tp.getEstimatedRows() : 0L;
102+
long processed = tp.getTotalRowsProcessed();
103+
double pct = est > 0 ? processed * 100.0 / est : 0.0;
104+
String bar = progressBar(pct, 20);
105+
String tableName = (tp.getSourceSchema() != null ? tp.getSourceSchema() + "." : "") + tp.getSourceTable();
106+
String lag = tp.getCurrentLagMs() != null ? tp.getCurrentLagMs().toString() : "N/A";
107+
System.out.printf("%-30s %-12s %s %5.1f%%%s %-10s%n",
108+
truncate(tableName, 30),
109+
tp.getStatus(),
110+
bar,
111+
pct,
112+
pct < 10 ? " " : "",
113+
lag
114+
);
115+
}
116+
}
117+
}
118+
119+
private double calculateOverallPercent(Long totalProcessed, Long totalEstimated, TaskResponse task) {
120+
if (totalEstimated != null && totalEstimated > 0) {
121+
return totalProcessed * 100.0 / totalEstimated;
122+
}
123+
if (task.getProgressPercentage() != null) {
124+
return task.getProgressPercentage();
125+
}
126+
if (task.getTotalTables() != null && task.getTotalTables() > 0 &&
127+
task.getCompletedTables() != null) {
128+
return task.getCompletedTables() * 100.0 / task.getTotalTables();
129+
}
130+
return 0.0;
131+
}
132+
133+
private Double calculateThroughput(TaskResponse task, Long totalProcessed) {
134+
OffsetDateTime startedAt = task.getStartedAt();
135+
if (startedAt == null || totalProcessed == null || totalProcessed <= 0) {
136+
return null;
137+
}
138+
long elapsedSeconds = Duration.between(startedAt, OffsetDateTime.now()).getSeconds();
139+
if (elapsedSeconds <= 0) {
140+
return null;
141+
}
142+
return totalProcessed / (double) elapsedSeconds;
143+
}
144+
145+
private String progressBar(double percent, int width) {
146+
if (percent < 0) {
147+
percent = 0;
148+
}
149+
if (percent > 100) {
150+
percent = 100;
151+
}
152+
int filled = (int) Math.round(percent * width / 100.0);
153+
StringBuilder sb = new StringBuilder();
154+
sb.append("[");
155+
for (int i = 0; i < width; i++) {
156+
sb.append(i < filled ? "#" : ".");
157+
}
158+
sb.append("]");
159+
return sb.toString();
160+
}
161+
162+
private String formatDuration(long seconds) {
163+
long s = seconds;
164+
long h = s / 3600;
165+
s %= 3600;
166+
long m = s / 60;
167+
s %= 60;
168+
if (h > 0) {
169+
return String.format("%dh %02dm %02ds", h, m, s);
170+
}
171+
if (m > 0) {
172+
return String.format("%dm %02ds", m, s);
173+
}
174+
return s + "s";
175+
}
176+
177+
private void clearScreen() {
178+
System.out.print("\033[H\033[2J");
179+
System.out.flush();
180+
}
181+
182+
private String truncate(String str, int maxLength) {
183+
if (str == null) {
184+
return "";
185+
}
186+
if (str.length() <= maxLength) {
187+
return str;
188+
}
189+
return str.substring(0, maxLength - 3) + "...";
190+
}
191+
192+
private UUID resolveTaskId(String identifier) {
193+
try {
194+
return UUID.fromString(identifier);
195+
} catch (IllegalArgumentException e) {
196+
TaskResponse task = taskService.getTask(identifier);
197+
return task.getId();
198+
}
199+
}
200+
}
201+

0 commit comments

Comments
 (0)