Skip to content

Commit cf35eec

Browse files
authored
[Fix][Connector-V2] Fix jdbc sink statement buffer wrong time to clear (#8653)
1 parent 48253da commit cf35eec

File tree

6 files changed

+466
-6
lines changed

6 files changed

+466
-6
lines changed

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferReducedBatchStatementExecutor.java

-3
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,6 @@ public void closeStatements() throws SQLException {
101101
executeBatch();
102102
}
103103
} finally {
104-
if (!buffer.isEmpty()) {
105-
buffer.clear();
106-
}
107104
upsertExecutor.closeStatements();
108105
deleteExecutor.closeStatements();
109106
}

Diff for: seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BufferedBatchStatementExecutor.java

-3
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ public void closeStatements() throws SQLException {
6262
executeBatch();
6363
}
6464
} finally {
65-
if (!buffer.isEmpty()) {
66-
buffer.clear();
67-
}
6865
statementExecutor.closeStatements();
6966
}
7067
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils;
19+
20+
import java.sql.Array;
21+
import java.sql.Blob;
22+
import java.sql.CallableStatement;
23+
import java.sql.Clob;
24+
import java.sql.Connection;
25+
import java.sql.DatabaseMetaData;
26+
import java.sql.NClob;
27+
import java.sql.PreparedStatement;
28+
import java.sql.SQLClientInfoException;
29+
import java.sql.SQLException;
30+
import java.sql.SQLWarning;
31+
import java.sql.SQLXML;
32+
import java.sql.Savepoint;
33+
import java.sql.Statement;
34+
import java.sql.Struct;
35+
import java.util.Collections;
36+
import java.util.Map;
37+
import java.util.Properties;
38+
import java.util.concurrent.Executor;
39+
40+
public class TestConnection implements Connection {
41+
@Override
42+
public Statement createStatement() throws SQLException {
43+
return null;
44+
}
45+
46+
@Override
47+
public PreparedStatement prepareStatement(String sql) throws SQLException {
48+
return null;
49+
}
50+
51+
@Override
52+
public CallableStatement prepareCall(String sql) throws SQLException {
53+
return null;
54+
}
55+
56+
@Override
57+
public String nativeSQL(String sql) throws SQLException {
58+
return "";
59+
}
60+
61+
@Override
62+
public void setAutoCommit(boolean autoCommit) throws SQLException {}
63+
64+
@Override
65+
public boolean getAutoCommit() throws SQLException {
66+
return false;
67+
}
68+
69+
@Override
70+
public void commit() throws SQLException {}
71+
72+
@Override
73+
public void rollback() throws SQLException {}
74+
75+
@Override
76+
public void close() throws SQLException {}
77+
78+
@Override
79+
public boolean isClosed() throws SQLException {
80+
return false;
81+
}
82+
83+
@Override
84+
public DatabaseMetaData getMetaData() throws SQLException {
85+
return null;
86+
}
87+
88+
@Override
89+
public void setReadOnly(boolean readOnly) throws SQLException {}
90+
91+
@Override
92+
public boolean isReadOnly() throws SQLException {
93+
return false;
94+
}
95+
96+
@Override
97+
public void setCatalog(String catalog) throws SQLException {}
98+
99+
@Override
100+
public String getCatalog() throws SQLException {
101+
return "";
102+
}
103+
104+
@Override
105+
public void setTransactionIsolation(int level) throws SQLException {}
106+
107+
@Override
108+
public int getTransactionIsolation() throws SQLException {
109+
return 0;
110+
}
111+
112+
@Override
113+
public SQLWarning getWarnings() throws SQLException {
114+
return null;
115+
}
116+
117+
@Override
118+
public void clearWarnings() throws SQLException {}
119+
120+
@Override
121+
public Statement createStatement(int resultSetType, int resultSetConcurrency)
122+
throws SQLException {
123+
return null;
124+
}
125+
126+
@Override
127+
public PreparedStatement prepareStatement(
128+
String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
129+
return null;
130+
}
131+
132+
@Override
133+
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency)
134+
throws SQLException {
135+
return null;
136+
}
137+
138+
@Override
139+
public Map<String, Class<?>> getTypeMap() throws SQLException {
140+
return Collections.emptyMap();
141+
}
142+
143+
@Override
144+
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {}
145+
146+
@Override
147+
public void setHoldability(int holdability) throws SQLException {}
148+
149+
@Override
150+
public int getHoldability() throws SQLException {
151+
return 0;
152+
}
153+
154+
@Override
155+
public Savepoint setSavepoint() throws SQLException {
156+
return null;
157+
}
158+
159+
@Override
160+
public Savepoint setSavepoint(String name) throws SQLException {
161+
return null;
162+
}
163+
164+
@Override
165+
public void rollback(Savepoint savepoint) throws SQLException {}
166+
167+
@Override
168+
public void releaseSavepoint(Savepoint savepoint) throws SQLException {}
169+
170+
@Override
171+
public Statement createStatement(
172+
int resultSetType, int resultSetConcurrency, int resultSetHoldability)
173+
throws SQLException {
174+
return null;
175+
}
176+
177+
@Override
178+
public PreparedStatement prepareStatement(
179+
String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
180+
throws SQLException {
181+
return null;
182+
}
183+
184+
@Override
185+
public CallableStatement prepareCall(
186+
String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
187+
throws SQLException {
188+
return null;
189+
}
190+
191+
@Override
192+
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys)
193+
throws SQLException {
194+
return null;
195+
}
196+
197+
@Override
198+
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
199+
return null;
200+
}
201+
202+
@Override
203+
public PreparedStatement prepareStatement(String sql, String[] columnNames)
204+
throws SQLException {
205+
return null;
206+
}
207+
208+
@Override
209+
public Clob createClob() throws SQLException {
210+
return null;
211+
}
212+
213+
@Override
214+
public Blob createBlob() throws SQLException {
215+
return null;
216+
}
217+
218+
@Override
219+
public NClob createNClob() throws SQLException {
220+
return null;
221+
}
222+
223+
@Override
224+
public SQLXML createSQLXML() throws SQLException {
225+
return null;
226+
}
227+
228+
@Override
229+
public boolean isValid(int timeout) throws SQLException {
230+
return false;
231+
}
232+
233+
@Override
234+
public void setClientInfo(String name, String value) throws SQLClientInfoException {}
235+
236+
@Override
237+
public void setClientInfo(Properties properties) throws SQLClientInfoException {}
238+
239+
@Override
240+
public String getClientInfo(String name) throws SQLException {
241+
return "";
242+
}
243+
244+
@Override
245+
public Properties getClientInfo() throws SQLException {
246+
return null;
247+
}
248+
249+
@Override
250+
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
251+
return null;
252+
}
253+
254+
@Override
255+
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
256+
return null;
257+
}
258+
259+
@Override
260+
public void setSchema(String schema) throws SQLException {}
261+
262+
@Override
263+
public String getSchema() throws SQLException {
264+
return "";
265+
}
266+
267+
@Override
268+
public void abort(Executor executor) throws SQLException {}
269+
270+
@Override
271+
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {}
272+
273+
@Override
274+
public int getNetworkTimeout() throws SQLException {
275+
return 0;
276+
}
277+
278+
@Override
279+
public <T> T unwrap(Class<T> iface) throws SQLException {
280+
return null;
281+
}
282+
283+
@Override
284+
public boolean isWrapperFor(Class<?> iface) throws SQLException {
285+
return false;
286+
}
287+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
19+
20+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
21+
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.TestConnection;
22+
23+
import org.junit.jupiter.api.Assertions;
24+
import org.junit.jupiter.api.Test;
25+
26+
import java.sql.SQLException;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
30+
public abstract class BufferExecutorTest {
31+
32+
abstract JdbcBatchStatementExecutor<SeaTunnelRow> getExecutorWithBatchRecorder(
33+
List<SeaTunnelRow> recorder);
34+
35+
@Test
36+
void testCacheAlwaysExistWhenInsertFailed() throws SQLException {
37+
List<SeaTunnelRow> recorder = new ArrayList<>();
38+
39+
JdbcBatchStatementExecutor<SeaTunnelRow> executor = getExecutorWithBatchRecorder(recorder);
40+
executor.prepareStatements(new TestConnection());
41+
executor.addToBatch(new SeaTunnelRow(new Object[] {"test"}));
42+
43+
SQLException exception =
44+
Assertions.assertThrows(SQLException.class, executor::executeBatch);
45+
Assertions.assertEquals("test", exception.getMessage());
46+
// the main point of this test is to check if the buffer is cleared after closeStatements
47+
// and prepareStatements when executeBatch failed
48+
Assertions.assertThrows(SQLException.class, executor::closeStatements);
49+
executor.prepareStatements(new TestConnection());
50+
SQLException exception2 =
51+
Assertions.assertThrows(SQLException.class, executor::executeBatch);
52+
Assertions.assertEquals("test", exception2.getMessage());
53+
54+
// three times of addToBatch, 1. executeBatch, 2. closeStatements, 3. executeBatch
55+
Assertions.assertEquals(3, recorder.size());
56+
// same row to executeBatch
57+
Assertions.assertEquals(recorder.get(0), recorder.get(2));
58+
}
59+
}

0 commit comments

Comments
 (0)