Skip to content

Commit 3a8d2a9

Browse files
committed
[Improve][log] Add logs in transform/sink error
1 parent 2cc4903 commit 3a8d2a9

File tree

8 files changed

+134
-60
lines changed

8 files changed

+134
-60
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,15 @@ public List<MultiTableAggregatedCommitInfo> commit(
8585
.get(sinkIdentifier))
8686
.filter(Objects::nonNull)
8787
.collect(Collectors.toList());
88-
List errCommitList = sinkCommitter.commit(commitInfo);
88+
List errCommitList;
89+
try {
90+
errCommitList = sinkCommitter.commit(commitInfo);
91+
} catch (Exception e) {
92+
String message =
93+
String.format("table %s commit throw an error", sinkIdentifier);
94+
log.error(message, e);
95+
throw new RuntimeException(message, e);
96+
}
8997
if (errCommitList.size() == 0) {
9098
continue;
9199
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkCommitter.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919

2020
import org.apache.seatunnel.api.sink.SinkCommitter;
2121

22+
import lombok.extern.slf4j.Slf4j;
23+
2224
import java.io.IOException;
2325
import java.util.ArrayList;
2426
import java.util.List;
2527
import java.util.Map;
2628
import java.util.Objects;
2729
import java.util.stream.Collectors;
2830

31+
@Slf4j
2932
public class MultiTableSinkCommitter implements SinkCommitter<MultiTableCommitInfo> {
3033

3134
private final Map<String, SinkCommitter<?>> sinkCommitters;
@@ -54,7 +57,14 @@ public List<MultiTableCommitInfo> commit(List<MultiTableCommitInfo> commitInfos)
5457
sinkIdentifier)))
5558
.map(Map.Entry::getValue)
5659
.collect(Collectors.toList());
57-
sinkCommitter.commit(commitInfo);
60+
try {
61+
sinkCommitter.commit(commitInfo);
62+
} catch (Exception e) {
63+
String message =
64+
String.format("table %s commit throw an error", sinkIdentifier);
65+
log.error(message, e);
66+
throw new RuntimeException(message, e);
67+
}
5868
}
5969
}
6070
return new ArrayList<>();

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java

+20-4
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,17 @@ public List<MultiTableState> snapshotState(long checkpointId) throws IOException
220220
for (Map.Entry<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriterEntry :
221221
sinkWritersWithIndex.get(i).entrySet()) {
222222
synchronized (runnable.get(i)) {
223-
List states = sinkWriterEntry.getValue().snapshotState(checkpointId);
224-
multiTableState.getStates().put(sinkWriterEntry.getKey(), states);
223+
try {
224+
List<?> states = sinkWriterEntry.getValue().snapshotState(checkpointId);
225+
multiTableState.getStates().put(sinkWriterEntry.getKey(), states);
226+
} catch (Exception e) {
227+
String message =
228+
String.format(
229+
"table %s snapshotState throw an error",
230+
sinkWriterEntry.getKey().getTableIdentifier());
231+
log.error(message, e);
232+
throw new RuntimeException(message, e);
233+
}
225234
}
226235
}
227236
}
@@ -257,8 +266,15 @@ public Optional<MultiTableCommitInfo> prepareCommit(long checkpointId) throws IO
257266
SinkWriter<SeaTunnelRow, ?, ?> sinkWriter =
258267
sinkWriterEntry.getValue();
259268
commit = sinkWriter.prepareCommit(checkpointId);
260-
} catch (IOException e) {
261-
throw new RuntimeException(e);
269+
} catch (Exception e) {
270+
String message =
271+
String.format(
272+
"table %s prepareCommit throw an error",
273+
sinkWriterEntry
274+
.getKey()
275+
.getTableIdentifier());
276+
log.error(message, e);
277+
throw new RuntimeException(message, e);
262278
}
263279
commit.ifPresent(
264280
o ->

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/Record.java

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.Serializable;
2121

2222
/** Contain {@link SeaTunnelRow} or Checkpoint Barrier */
23+
@ToString
2324
public class Record<T> implements Serializable {
2425

2526
private final T data;

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java

+1
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ public void received(Record<?> record) {
299299
}
300300
}
301301
} catch (Exception e) {
302+
log.error("Failed to receive record: {}", record, e);
302303
throw new RuntimeException(e);
303304
}
304305
}

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java

+52-46
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
3232
import org.apache.seatunnel.engine.server.task.record.Barrier;
3333

34-
import org.apache.commons.collections4.CollectionUtils;
35-
3634
import lombok.extern.slf4j.Slf4j;
3735

3836
import java.io.IOException;
@@ -79,53 +77,59 @@ public void open() throws Exception {
7977

8078
@Override
8179
public void received(Record<?> record) {
82-
if (record.getData() instanceof Barrier) {
83-
CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
84-
if (barrier.prepareClose(this.runningTask.getTaskLocation())) {
85-
prepareClose = true;
86-
}
87-
if (barrier.snapshot()) {
88-
runningTask.addState(barrier, ActionStateKey.of(action), Collections.emptyList());
89-
}
90-
// ack after #addState
91-
runningTask.ack(barrier);
92-
collector.collect(record);
93-
} else if (record.getData() instanceof SchemaChangeEvent) {
94-
if (prepareClose) {
95-
return;
96-
}
97-
SchemaChangeEvent event = (SchemaChangeEvent) record.getData();
98-
for (SeaTunnelTransform<T> t : transform) {
99-
SchemaChangeEvent eventBefore = event;
100-
event = t.mapSchemaChangeEvent(eventBefore);
101-
if (event == null) {
80+
try {
81+
if (record.getData() instanceof Barrier) {
82+
CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
83+
if (barrier.prepareClose(this.runningTask.getTaskLocation())) {
84+
prepareClose = true;
85+
}
86+
if (barrier.snapshot()) {
87+
runningTask.addState(
88+
barrier, ActionStateKey.of(action), Collections.emptyList());
89+
}
90+
// ack after #addState
91+
runningTask.ack(barrier);
92+
collector.collect(record);
93+
} else if (record.getData() instanceof SchemaChangeEvent) {
94+
if (prepareClose) {
95+
return;
96+
}
97+
SchemaChangeEvent event = (SchemaChangeEvent) record.getData();
98+
for (SeaTunnelTransform<T> t : transform) {
99+
SchemaChangeEvent eventBefore = event;
100+
event = t.mapSchemaChangeEvent(eventBefore);
101+
if (event == null) {
102+
log.info(
103+
"Transform[{}] filtered schema change event {}",
104+
t.getPluginName(),
105+
eventBefore);
106+
break;
107+
}
102108
log.info(
103-
"Transform[{}] filtered schema change event {}",
109+
"Transform[{}] input schema change event {} and output schema change event {}",
104110
t.getPluginName(),
105-
eventBefore);
106-
break;
111+
eventBefore,
112+
event);
107113
}
108-
log.info(
109-
"Transform[{}] input schema change event {} and output schema change event {}",
110-
t.getPluginName(),
111-
eventBefore,
112-
event);
113-
}
114-
if (event != null) {
115-
collector.collect(new Record<>(event));
116-
}
117-
} else {
118-
if (prepareClose) {
119-
return;
120-
}
121-
T inputData = (T) record.getData();
122-
List<T> outputDataList = transform(inputData);
123-
if (!outputDataList.isEmpty()) {
124-
// todo log metrics
125-
for (T outputData : outputDataList) {
126-
collector.collect(new Record<>(outputData));
114+
if (event != null) {
115+
collector.collect(new Record<>(event));
116+
}
117+
} else {
118+
if (prepareClose) {
119+
return;
120+
}
121+
T inputData = (T) record.getData();
122+
List<T> outputDataList = transform(inputData);
123+
if (!outputDataList.isEmpty()) {
124+
// todo log metrics
125+
for (T outputData : outputDataList) {
126+
collector.collect(new Record<>(outputData));
127+
}
127128
}
128129
}
130+
} catch (Exception e) {
131+
log.error("Failed to receive record: {}", record, e);
132+
throw e;
129133
}
130134
}
131135

@@ -149,9 +153,11 @@ public List<T> transform(T inputData) {
149153
transformer,
150154
data,
151155
outputDataArray);
152-
if (CollectionUtils.isNotEmpty(outputDataArray)) {
153-
nextInputDataList.addAll(outputDataArray);
156+
if (outputDataArray == null || outputDataArray.isEmpty()) {
157+
log.trace("Transform[{}] filtered data row {}", transformer, data);
158+
continue;
154159
}
160+
nextInputDataList.addAll(outputDataArray);
155161
}
156162
} else if (transformer instanceof SeaTunnelMapTransform) {
157163
for (T data : dataList) {

seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogFlatMapTransform.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2323
import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform;
2424

25+
import lombok.extern.slf4j.Slf4j;
26+
2527
import java.util.List;
2628

2729
/** Abstract class for multi-table flat map transform. */
30+
@Slf4j
2831
public abstract class AbstractMultiCatalogFlatMapTransform extends AbstractMultiCatalogTransform
2932
implements SeaTunnelFlatMapTransform<SeaTunnelRow> {
3033

@@ -35,12 +38,24 @@ public AbstractMultiCatalogFlatMapTransform(
3538

3639
@Override
3740
public List<SeaTunnelRow> flatMap(SeaTunnelRow row) {
41+
String tableId;
42+
SeaTunnelFlatMapTransform<SeaTunnelRow> transform;
3843
if (transformMap.size() == 1) {
39-
return ((SeaTunnelFlatMapTransform<SeaTunnelRow>)
40-
transformMap.values().iterator().next())
41-
.flatMap(row);
44+
tableId = transformMap.keySet().iterator().next();
45+
} else {
46+
tableId = row.getTableId();
47+
}
48+
transform = (SeaTunnelFlatMapTransform<SeaTunnelRow>) transformMap.get(tableId);
49+
50+
try {
51+
return transform.flatMap(row);
52+
} catch (Exception e) {
53+
String message =
54+
String.format(
55+
"The transform %s map table %s data throw an error",
56+
transform.getPluginName(), tableId);
57+
log.error(message, e);
58+
throw new RuntimeException(message, e);
4259
}
43-
return ((SeaTunnelFlatMapTransform<SeaTunnelRow>) transformMap.get(row.getTableId()))
44-
.flatMap(row);
4560
}
4661
}

seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogMapTransform.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2323
import org.apache.seatunnel.api.transform.SeaTunnelMapTransform;
2424

25+
import lombok.extern.slf4j.Slf4j;
26+
2527
import java.util.List;
2628

2729
/** Abstract class for multi-table map transform. */
30+
@Slf4j
2831
public abstract class AbstractMultiCatalogMapTransform extends AbstractMultiCatalogTransform
2932
implements SeaTunnelMapTransform<SeaTunnelRow> {
3033

@@ -35,10 +38,24 @@ public AbstractMultiCatalogMapTransform(
3538

3639
@Override
3740
public SeaTunnelRow map(SeaTunnelRow row) {
41+
String tableId;
42+
SeaTunnelMapTransform<SeaTunnelRow> transform;
3843
if (transformMap.size() == 1) {
39-
return ((SeaTunnelMapTransform<SeaTunnelRow>) transformMap.values().iterator().next())
40-
.map(row);
44+
tableId = transformMap.keySet().iterator().next();
45+
} else {
46+
tableId = row.getTableId();
47+
}
48+
transform = (SeaTunnelMapTransform<SeaTunnelRow>) transformMap.get(tableId);
49+
50+
try {
51+
return transform.map(row);
52+
} catch (Exception e) {
53+
String message =
54+
String.format(
55+
"The transform %s map table %s data throw an error",
56+
transform.getPluginName(), tableId);
57+
log.error(message, e);
58+
throw new RuntimeException(message, e);
4159
}
42-
return ((SeaTunnelMapTransform<SeaTunnelRow>) transformMap.get(row.getTableId())).map(row);
4360
}
4461
}

0 commit comments

Comments
 (0)