Skip to content

Commit a3b9e22

Browse files
committed
SafeSql.queryLazily()
1 parent 28c5478 commit a3b9e22

File tree

7 files changed

+240
-1
lines changed

7 files changed

+240
-1
lines changed

mug-guava/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@
7070
<groupId>com.google.guava</groupId>
7171
<artifactId>guava-testlib</artifactId>
7272
</dependency>
73+
74+
<dependency>
75+
<groupId>org.mockito</groupId>
76+
<artifactId>mockito-core</artifactId>
77+
<scope>test</scope>
78+
</dependency>
7379
<dependency>
7480
<groupId>com.h2database</groupId>
7581
<artifactId>h2</artifactId>

mug-guava/src/.DS_Store

8 KB
Binary file not shown.
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*****************************************************************************
2+
* ------------------------------------------------------------------------- *
3+
* Licensed under the Apache License, Version 2.0 (the "License"); *
4+
* you may not use this file except in compliance with the License. *
5+
* You may obtain a copy of the License at *
6+
* *
7+
* http://www.apache.org/licenses/LICENSE-2.0 *
8+
* *
9+
* Unless required by applicable law or agreed to in writing, software *
10+
* distributed under the License is distributed on an "AS IS" BASIS, *
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
12+
* See the License for the specific language governing permissions and *
13+
* limitations under the License. *
14+
*****************************************************************************/
15+
package com.google.mu.safesql;
16+
17+
import static com.google.common.base.Preconditions.checkState;
18+
19+
import java.io.IOException;
20+
import java.sql.SQLException;
21+
import java.util.stream.Stream;
22+
23+
import com.google.common.base.VerifyException;
24+
import com.google.common.io.Closer;
25+
import com.google.errorprone.annotations.MustBeClosed;
26+
27+
/** Helps manage JDBC resources that need to be closed now or lazily. */
28+
final class JdbcCloser implements AutoCloseable {
29+
interface JdbcCloseable {
30+
void run() throws SQLException;
31+
}
32+
33+
private final Closer closer = Closer.create();
34+
private boolean owned = true;
35+
36+
void register(JdbcCloseable closeable) {
37+
closer.register(() -> {
38+
try {
39+
closeable.run();
40+
} catch (SQLException e) {
41+
throw new UncheckedSqlException(e);
42+
}
43+
});
44+
}
45+
46+
@MustBeClosed
47+
<T> Stream<T> attachTo(Stream<T> stream) {
48+
checkState(owned);
49+
owned = false;
50+
return stream.onClose(this::closeNow);
51+
}
52+
53+
@Override public void close() {
54+
if (owned) {
55+
closeNow();
56+
}
57+
}
58+
59+
private void closeNow() {
60+
try {
61+
closer.close();
62+
} catch (IOException e) {
63+
throw new VerifyException(e);
64+
}
65+
}
66+
}

mug-guava/src/main/java/com/google/mu/safesql/SafeSql.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import static com.google.mu.util.Substring.suffix;
3030
import static com.google.mu.util.Substring.word;
3131
import static com.google.mu.util.stream.MoreStreams.indexesFrom;
32+
import static com.google.mu.util.stream.MoreStreams.whileNotNull;
3233
import static java.util.Collections.emptyList;
3334
import static java.util.Collections.unmodifiableList;
3435
import static java.util.stream.Collectors.collectingAndThen;
@@ -47,6 +48,7 @@
4748
import java.util.Optional;
4849
import java.util.concurrent.ConcurrentHashMap;
4950
import java.util.concurrent.ConcurrentMap;
51+
import java.util.concurrent.atomic.AtomicReference;
5052
import java.util.function.Supplier;
5153
import java.util.stream.Collector;
5254
import java.util.stream.Collectors;
@@ -702,7 +704,8 @@ public SafeSql orElse(Supplier<SafeSql> fallback) {
702704
* .query(connection, row -> row.getLong("id"));
703705
* }</pre>
704706
*
705-
* <p>Internally it delegates to {@link PreparedStatement#executeQuery}.
707+
* <p>Internally it delegates to {@link PreparedStatement#executeQuery} or {@link
708+
* Statement#executeQuery} if this sql contains no JDBC binding parameters.
706709
*
707710
* @throws UncheckedSqlException wraps {@link SQLException} if failed
708711
*/
@@ -725,6 +728,64 @@ public <T> List<T> query(
725728
}
726729
}
727730

731+
/**
732+
* Executes the encapsulated SQL as a query against {@code connection}, sets {@code fetchSize}
733+
* using {@link Statement#setFetchSize}, and then fetches the results lazily in a stream.
734+
*
735+
* <p>The returned {@code Stream} includes results transformed by {@code rowMapper}.
736+
* The caller must close it using try-with-resources idiom, which will close the associated
737+
* statement and {@link ResultSet}.
738+
*
739+
* <p>For example: <pre>{@code
740+
* SafeSql sql = SafeSql.of("SELECT name FROM Users WHERE name LIKE '%{name}%'", name);
741+
* try (Stream<String> names = sql.query(connection, row -> row.getLong("id"))) {
742+
* return names.findFirst();
743+
* }
744+
* }</pre>
745+
*
746+
* <p>Internally it delegates to {@link PreparedStatement#executeQuery} or {@link
747+
* Statement#executeQuery} if this sql contains no JDBC binding parameters.
748+
*
749+
* @throws UncheckedSqlException wraps {@link SQLException} if failed
750+
* @since 8.4
751+
*/
752+
@MustBeClosed
753+
@SuppressWarnings("MustBeClosedChecker")
754+
public <T> Stream<T> queryLazily(
755+
Connection connection, int fetchSize, SqlFunction<? super ResultSet, ? extends T> rowMapper) {
756+
checkNotNull(rowMapper);
757+
if (paramValues.isEmpty()) {
758+
return lazy(connection::createStatement, fetchSize, stmt -> stmt.executeQuery(sql), rowMapper);
759+
}
760+
return lazy(() -> prepareStatement(connection), fetchSize, PreparedStatement::executeQuery, rowMapper);
761+
}
762+
763+
@MustBeClosed
764+
private static <S extends Statement, T> Stream<T> lazy(
765+
SqlSupplier<? extends S> createStatement,
766+
int fetchSize,
767+
SqlFunction<? super S, ResultSet> execute,
768+
SqlFunction<? super ResultSet, ? extends T> rowMapper) {
769+
try (JdbcCloser closer = new JdbcCloser()) {
770+
S stmt = createStatement.get();
771+
closer.register(stmt::close);
772+
stmt.setFetchSize(fetchSize);
773+
ResultSet resultSet = execute.apply(stmt);
774+
closer.register(resultSet::close);
775+
return closer.attachTo(
776+
whileNotNull(() -> {
777+
try {
778+
return resultSet.next() ? new AtomicReference<T>(rowMapper.apply(resultSet)) : null;
779+
} catch (SQLException e) {
780+
throw new UncheckedSqlException(e);
781+
}
782+
})
783+
.map(AtomicReference::get));
784+
} catch (SQLException e) {
785+
throw new UncheckedSqlException(e);
786+
}
787+
}
788+
728789
/**
729790
* Executes the encapsulated DML (create, update, delete statements) against {@code connection}
730791
* and returns the number of affected rows.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.google.mu.safesql;
2+
3+
import java.sql.SQLException;
4+
5+
/** For lambdas that can throw {@link SQLException}. */
6+
@FunctionalInterface
7+
public interface SqlSupplier<T> {
8+
T get() throws SQLException;
9+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.google.mu.safesql;
2+
3+
import static org.junit.Assert.assertThrows;
4+
import static org.mockito.Mockito.never;
5+
import static org.mockito.Mockito.verify;
6+
7+
import java.sql.Statement;
8+
import java.util.stream.Stream;
9+
10+
import org.junit.Before;
11+
import org.junit.Test;
12+
import org.junit.runner.RunWith;
13+
import org.mockito.Mock;
14+
import org.mockito.MockitoAnnotations;
15+
16+
import com.google.testing.junit.testparameterinjector.TestParameterInjector;
17+
18+
@RunWith(TestParameterInjector.class)
19+
public class JdbcCloserTest {
20+
@Mock private Statement statement;
21+
22+
@Before public void setUpMocks() {
23+
MockitoAnnotations.initMocks(this);
24+
}
25+
26+
@Test public void nothingRegistered() {
27+
try (JdbcCloser closer = new JdbcCloser()) {}
28+
}
29+
30+
@SuppressWarnings("MustBeClosedChecker")
31+
@Test public void attachTo_nothingRegistered() {
32+
Stream<String> stream = Stream.of("foo", "bar");
33+
try (JdbcCloser closer = new JdbcCloser()) {
34+
stream = closer.attachTo(stream);
35+
}
36+
try (Stream<String> closeMe = stream) {}
37+
}
38+
39+
@Test public void resourceClosed() throws Exception {
40+
try (JdbcCloser closer = new JdbcCloser()) {
41+
closer.register(statement::close);
42+
}
43+
verify(statement).close();
44+
}
45+
46+
@SuppressWarnings("MustBeClosedChecker")
47+
@Test public void resourceAttached() throws Exception {
48+
Stream<String> stream = Stream.of("foo", "bar");
49+
try (JdbcCloser closer = new JdbcCloser()) {
50+
closer.register(statement::close);
51+
stream = closer.attachTo(stream);
52+
}
53+
verify(statement, never()).close();
54+
55+
// Now close the stream
56+
try (Stream<?> closeMe = stream) {}
57+
verify(statement).close();
58+
}
59+
60+
@SuppressWarnings("MustBeClosedChecker")
61+
@Test public void cannotAttachTwice() throws Exception {
62+
Stream<String> stream = Stream.of("foo", "bar");
63+
try (JdbcCloser closer = new JdbcCloser()) {
64+
closer.register(statement::close);
65+
stream = closer.attachTo(stream);
66+
assertThrows(IllegalStateException.class, () -> closer.attachTo(Stream.empty()));
67+
}
68+
verify(statement, never()).close();
69+
try (Stream<?> closeMe = stream) {}
70+
verify(statement).close();
71+
}
72+
}

mug-guava/src/test/java/com/google/mu/safesql/SafeSqlDbTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import java.time.ZoneId;
1010
import java.time.ZonedDateTime;
1111
import java.util.List;
12+
import java.util.stream.Collectors;
13+
import java.util.stream.Stream;
1214

1315
import javax.sql.DataSource;
1416

@@ -72,6 +74,23 @@ protected DatabaseOperation getTearDownOperation() {
7274
.containsExactly("bar");
7375
}
7476

77+
@Test public void queryLazily() throws Exception {
78+
ZonedDateTime barTime = ZonedDateTime.of(2024, 11, 1, 10, 20, 30, 40, ZoneId.of("UTC"));
79+
assertThat(
80+
SafeSql.of("insert into ITEMS(id, title) VALUES({id}, {title})", testId(), "foo")
81+
.update(connection()))
82+
.isEqualTo(1);
83+
assertThat(
84+
SafeSql.of("insert into ITEMS(id, title, time) VALUES({id}, {title}, {time})", testId() + 1, "bar", barTime)
85+
.update(connection()))
86+
.isEqualTo(1);
87+
assertThat(queryColumnStream(
88+
SafeSql.of("select title from {tbl} where id = {id}", /* tbl */ SafeSql.of("ITEMS"), testId()), "title"))
89+
.containsExactly("foo");
90+
assertThat(queryColumnStream(SafeSql.of("select title from ITEMS where id = {id}", testId() + 1), "title"))
91+
.containsExactly("bar");
92+
}
93+
7594
@Test public void likeExpressionWithWildcardInArg() throws Exception {
7695
assertThat(
7796
SafeSql.of("insert into ITEMS(id, title) VALUES({id}, {title})", testId(), "foo")
@@ -312,6 +331,12 @@ private List<?> queryColumn(SafeSql sql, String column) throws Exception {
312331
return sql.query(connection(), resultSet -> resultSet.getObject(column));
313332
}
314333

334+
private List<?> queryColumnStream(SafeSql sql, String column) throws Exception {
335+
try (Stream<?> stream = sql.queryLazily(connection(), 1, resultSet -> resultSet.getObject(column))) {
336+
return stream.collect(Collectors.toList());
337+
}
338+
}
339+
315340
private Connection connection() throws Exception {
316341
return getConnection().getConnection();
317342
}

0 commit comments

Comments
 (0)