Skip to content

Commit 5e0e376

Browse files
authored
[Improve][Zeta] Support restore execute savemode (#9059)
1 parent 9426b7b commit 5e0e376

File tree

6 files changed

+109
-4
lines changed

6 files changed

+109
-4
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java

+10
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,16 @@ public void handleDataSaveMode() {
117117
}
118118
}
119119

120+
@Override
121+
public void handleSchemaSaveModeWithRestore() {
122+
if (SchemaSaveMode.ERROR_WHEN_SCHEMA_NOT_EXIST == schemaSaveMode) {
123+
errorWhenSchemaNotExist();
124+
} else if (SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST == schemaSaveMode
125+
|| SchemaSaveMode.RECREATE_SCHEMA == schemaSaveMode) {
126+
createSchemaWhenNotExist();
127+
}
128+
}
129+
120130
protected void recreateSchema() {
121131
if (tableExists()) {
122132
dropTable();

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public interface SaveModeHandler extends AutoCloseable {
2828

2929
void handleDataSaveMode();
3030

31+
void handleSchemaSaveModeWithRestore();
32+
3133
SchemaSaveMode getSchemaSaveMode();
3234

3335
DataSaveMode getDataSaveMode();

seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandlerTest.java

+64
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,24 @@
2222
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
2323
import org.apache.seatunnel.api.table.catalog.InMemoryCatalog;
2424
import org.apache.seatunnel.api.table.catalog.InMemoryCatalogFactory;
25+
import org.apache.seatunnel.api.table.catalog.TablePath;
2526
import org.apache.seatunnel.api.table.type.BasicType;
2627
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2728
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
29+
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
2830

2931
import org.junit.jupiter.api.BeforeEach;
3032
import org.junit.jupiter.api.Test;
3133

3234
import static org.junit.jupiter.api.Assertions.assertFalse;
35+
import static org.junit.jupiter.api.Assertions.assertThrows;
3336
import static org.junit.jupiter.api.Assertions.assertTrue;
37+
import static org.mockito.ArgumentMatchers.any;
38+
import static org.mockito.ArgumentMatchers.eq;
39+
import static org.mockito.Mockito.mock;
40+
import static org.mockito.Mockito.times;
41+
import static org.mockito.Mockito.verify;
42+
import static org.mockito.Mockito.when;
3443

3544
public class DefaultSaveModeHandlerTest {
3645

@@ -115,6 +124,61 @@ public void shouldNotTruncateRecreatedTable() {
115124
"Should not truncate data for recreated table");
116125
}
117126

127+
@Test
128+
public void handlesErrorWhenSchemaNotExist() {
129+
Catalog catalog = mock(Catalog.class);
130+
CatalogTable catalogTable = createCatalogTable("notExistsTable");
131+
when(catalog.tableExists(any(TablePath.class))).thenReturn(false);
132+
DefaultSaveModeHandler handler =
133+
new DefaultSaveModeHandler(
134+
SchemaSaveMode.ERROR_WHEN_SCHEMA_NOT_EXIST,
135+
DataSaveMode.APPEND_DATA,
136+
catalog,
137+
catalogTable,
138+
null);
139+
140+
assertThrows(SeaTunnelRuntimeException.class, handler::handleSchemaSaveModeWithRestore);
141+
}
142+
143+
@Test
144+
public void createsSchemaWhenNotExist() {
145+
CatalogTable catalogTable = createCatalogTable("notExistsTable");
146+
147+
Catalog catalog = mock(Catalog.class);
148+
when(catalog.tableExists(any(TablePath.class))).thenReturn(false);
149+
DefaultSaveModeHandler handler =
150+
new DefaultSaveModeHandler(
151+
SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST,
152+
DataSaveMode.APPEND_DATA,
153+
catalog,
154+
catalogTable,
155+
null);
156+
157+
handler.handleSchemaSaveModeWithRestore();
158+
159+
verify(catalog, times(1))
160+
.createTable(any(TablePath.class), any(CatalogTable.class), eq(true));
161+
}
162+
163+
@Test
164+
public void recreatesSchemaWhenNotExist() {
165+
CatalogTable catalogTable = createCatalogTable("notExistsTable");
166+
Catalog catalog = mock(Catalog.class);
167+
when(catalog.tableExists(any(TablePath.class))).thenReturn(false);
168+
DefaultSaveModeHandler handler =
169+
new DefaultSaveModeHandler(
170+
SchemaSaveMode.RECREATE_SCHEMA,
171+
DataSaveMode.APPEND_DATA,
172+
catalog,
173+
catalogTable,
174+
null);
175+
176+
handler.handleSchemaSaveModeWithRestore();
177+
178+
verify(catalog, times(1))
179+
.createTable(any(TablePath.class), any(CatalogTable.class), eq(true));
180+
}
181+
118182
private CatalogTable createCatalogTable(String tableName) {
119183
return CatalogTableUtil.getCatalogTable("", "st", "public", tableName, rowType);
120184
}

seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySaveModeHandler.java

+3
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ public void handleDataSaveMode() {
5555
log.info("handle data savemode with table path: {}", catalogTable.getTablePath());
5656
}
5757

58+
@Override
59+
public void handleSchemaSaveModeWithRestore() {}
60+
5861
@Override
5962
public SchemaSaveMode getSchemaSaveMode() {
6063
return SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST;

seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java

+21
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,8 @@ private static <T> T findLast(LinkedHashMap<?, T> map) {
690690
actionConfig);
691691
if (!isStartWithSavePoint) {
692692
handleSaveMode(sink);
693+
} else {
694+
handleSchemaSaveModeWithRestore(sink);
693695
}
694696
sinkAction.setParallelism(parallelism);
695697
return sinkAction;
@@ -716,6 +718,25 @@ public void handleSaveMode(SeaTunnelSink<?, ?, ?, ?> sink) {
716718
}
717719
}
718720

721+
public void handleSchemaSaveModeWithRestore(SeaTunnelSink<?, ?, ?, ?> sink) {
722+
if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
723+
SupportSaveMode saveModeSink = (SupportSaveMode) sink;
724+
if (envOptions
725+
.get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
726+
.equals(SaveModeExecuteLocation.CLIENT)) {
727+
Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler();
728+
if (saveModeHandler.isPresent()) {
729+
try (SaveModeHandler handler = saveModeHandler.get()) {
730+
handler.open();
731+
handler.handleSchemaSaveModeWithRestore();
732+
} catch (Exception e) {
733+
throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
734+
}
735+
}
736+
}
737+
}
738+
}
739+
719740
private List<URL> getSourcePluginJarPaths(Config sourceConfig) {
720741
SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
721742
PluginIdentifier pluginIdentifier =

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,8 @@ public synchronized void init(long initializationTimestamp, boolean restart) thr
271271
logicalVertexIdClassLoaderMap.get(
272272
sink.getId()));
273273
JobMaster.handleSaveMode(
274-
((SinkAction<?, ?, ?, ?>) sink).getSink());
274+
((SinkAction<?, ?, ?, ?>) sink).getSink(),
275+
logicalDag.isStartWithSavePoint());
275276
});
276277
Thread.currentThread().setContextClassLoader(appClassLoader);
277278
}
@@ -556,22 +557,26 @@ public void run() {
556557
}
557558
}
558559

559-
public static void handleSaveMode(SeaTunnelSink sink) {
560+
public static void handleSaveMode(SeaTunnelSink sink, boolean isStartWithSavePoint) {
560561
if (sink instanceof SupportSaveMode) {
561562
Optional<SaveModeHandler> saveModeHandler =
562563
((SupportSaveMode) sink).getSaveModeHandler();
563564
if (saveModeHandler.isPresent()) {
564565
try (SaveModeHandler handler = saveModeHandler.get()) {
565566
handler.open();
566-
new SaveModeExecuteWrapper(handler).execute();
567+
if (!isStartWithSavePoint) {
568+
new SaveModeExecuteWrapper(handler).execute();
569+
} else {
570+
handler.handleSchemaSaveModeWithRestore();
571+
}
567572
} catch (Exception e) {
568573
throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
569574
}
570575
}
571576
} else if (sink instanceof MultiTableSink) {
572577
Map<TablePath, SeaTunnelSink> sinks = ((MultiTableSink) sink).getSinks();
573578
for (SeaTunnelSink seaTunnelSink : sinks.values()) {
574-
handleSaveMode(seaTunnelSink);
579+
handleSaveMode(seaTunnelSink, isStartWithSavePoint);
575580
}
576581
}
577582
}

0 commit comments

Comments
 (0)