|
30 | 30 | import static java.util.stream.Collectors.collectingAndThen; |
31 | 31 | import static java.util.stream.Collectors.mapping; |
32 | 32 | import static java.util.stream.Collectors.toCollection; |
| 33 | +import static java.util.stream.Collectors.toList; |
33 | 34 |
|
34 | 35 | import java.sql.Connection; |
35 | 36 | import java.sql.PreparedStatement; |
|
38 | 39 | import java.sql.Statement; |
39 | 40 | import java.util.ArrayList; |
40 | 41 | import java.util.Collection; |
| 42 | +import java.util.HashMap; |
41 | 43 | import java.util.Iterator; |
| 44 | +import java.util.LinkedHashMap; |
42 | 45 | import java.util.LinkedHashSet; |
43 | 46 | import java.util.List; |
| 47 | +import java.util.Map; |
44 | 48 | import java.util.Optional; |
45 | 49 | import java.util.Set; |
46 | 50 | import java.util.Spliterators; |
|
69 | 73 | import com.google.mu.util.StringFormat.Template; |
70 | 74 | import com.google.mu.util.Substring; |
71 | 75 | import com.google.mu.util.stream.BiStream; |
| 76 | +import com.google.mu.util.stream.Iteration; |
72 | 77 |
|
73 | 78 | /** |
74 | 79 | * An injection-safe <em>dynamic SQL</em>, constructed using compile-time enforced templates. |
@@ -959,6 +964,223 @@ public <T> Optional<T> queryForOne( |
959 | 964 | throw e.asChecked(); |
960 | 965 | } |
961 | 966 | } |
| 967 | + |
| 968 | + /** |
| 969 | + * Paginate the encapsulated query by fetching up to {@code pageSize} rows for each page. |
| 970 | + * |
| 971 | + * <p>If {@code orderByKeys} is non-empty, the column values are used to control pagination. |
| 972 | + * For example, if {@code [groupId, timestamp]} can uniquely identify a row, the generated |
| 973 | + * SQL will use {@code "groupId" > ? OR ("groupId" = ? AND "timestamp" > ?)} for the next page, |
| 974 | + * comparing against the previous page's last row. |
| 975 | + * |
| 976 | + * <p>You can also use descending order, for example by passing {@code "NAME DESC"}. |
| 977 | + * The generated paginated query will then use {@code "NAME" < ?} instead of {@code "NAME > ?} |
| 978 | + * for the next page. |
| 979 | + * |
| 980 | + * <p>The standard SQL {@code FETCH NEXT ROWS} syntax is used for pagination. |
| 981 | + * If {@code orderByKeys} is empty, the paginated query will use {@code OFFSET} keyword |
| 982 | + * to specify the next page offset. |
| 983 | + * |
| 984 | + * <p>All column names are automatically double-quoted to avoid clashing with keywords or spaces |
| 985 | + * in identifiers. If your DB is case sensitive, make sure you pass in the accurate case. |
| 986 | + * |
| 987 | + * <p>The method is lazy. Paginated queries are only sent when the returned stream is consumed. |
| 988 | + * The caller should close the {@code connection} after done, which will close the cached |
| 989 | + * JDBC statements used for pagination. |
| 990 | + * |
| 991 | + * @param orderByKeys for example {@code "firstName"} or {@code "group_id DESC"}. |
| 992 | + * @param pageSize must be positive. |
| 993 | + * @param resultType the record or class type to be constructed for each row. |
| 994 | + * @return a lazy stream of pages. If the query returns no rows, the stream will be empty. |
| 995 | + * @throws IllegalArgumentException if {@code pageSize} isn't positive. |
| 996 | + * @throws NullPointerException if any parameter is null. |
| 997 | + * @since 9.9.3 |
| 998 | + */ |
| 999 | + public <T> Stream<List<T>> paginateLazily( |
| 1000 | + Connection connection, |
| 1001 | + Collection<String> orderByKeys, |
| 1002 | + int pageSize, |
| 1003 | + Class<? extends T> resultType) { |
| 1004 | + return paginateLazily(connection, stmt -> {}, orderByKeys, pageSize, resultType); |
| 1005 | + } |
| 1006 | + |
| 1007 | + /** |
| 1008 | + * Paginate the encapsulated query by fetching up to {@code pageSize} rows for each page. |
| 1009 | + * |
| 1010 | + * <p>If {@code orderByKeys} is non-empty, the column values are used to control pagination. |
| 1011 | + * For example, if {@code [groupId, timestamp]} can uniquely identify a row, the generated |
| 1012 | + * SQL will use {@code "groupId" > ? OR ("groupId" = ? AND "timestamp" > ?)} for the next page, |
| 1013 | + * comparing against the previous page's last row. |
| 1014 | + * |
| 1015 | + * <p>You can also use descending order, for example by passing {@code "NAME DESC"}. |
| 1016 | + * The generated paginated query will then use {@code "NAME" < ?} instead of {@code "NAME > ?} |
| 1017 | + * for the next page. |
| 1018 | + * |
| 1019 | + * <p>The standard SQL {@code FETCH NEXT ROWS} syntax is used for pagination. |
| 1020 | + * If {@code orderByKeys} is empty, the paginated query will use {@code OFFSET} keyword |
| 1021 | + * to specify the next page offset. |
| 1022 | + * |
| 1023 | + * <p>All column names are automatically double-quoted to avoid clashing with keywords or spaces |
| 1024 | + * in identifiers. If your DB is case sensitive, make sure you pass in the accurate case. |
| 1025 | + * |
| 1026 | + * <p>The method is lazy. Paginated queries are only sent when the returned stream is consumed. |
| 1027 | + * The caller should close the {@code connection} after done, which will close the cached |
| 1028 | + * JDBC statements used for pagination. |
| 1029 | + * |
| 1030 | + * @param settings to configure the {@code PreparedStatement} used for the paginated queries. |
| 1031 | + * @param orderByKeys for example {@code "firstName"} or {@code "group_id DESC"}. |
| 1032 | + * @param pageSize must be positive. |
| 1033 | + * @param resultType the record or class type to be constructed for each row. |
| 1034 | + * @return a lazy stream of pages. If the query returns no rows, the stream will be empty. |
| 1035 | + * @throws IllegalArgumentException if {@code pageSize} isn't positive. |
| 1036 | + * @throws NullPointerException if any parameter is null. |
| 1037 | + * @since 9.9.3 |
| 1038 | + */ |
| 1039 | + public <T> Stream<List<T>> paginateLazily( |
| 1040 | + Connection connection, |
| 1041 | + StatementSettings settings, |
| 1042 | + Collection<String> orderByKeys, |
| 1043 | + int pageSize, |
| 1044 | + Class<? extends T> resultType) { |
| 1045 | + return paginateLazily( |
| 1046 | + connection, settings, orderByKeys, pageSize, ResultMapper.toResultOf(resultType)::from); |
| 1047 | + } |
| 1048 | + |
| 1049 | + /** |
| 1050 | + * Paginate the encapsulated query by fetching up to {@code pageSize} rows for each page. |
| 1051 | + * |
| 1052 | + * <p>If {@code orderByKeys} is non-empty, the column values are used to control pagination. |
| 1053 | + * For example, if {@code [groupId, timestamp]} can uniquely identify a row, the generated |
| 1054 | + * SQL will use {@code "groupId" > ? OR ("groupId" = ? AND "timestamp" > ?)} for the next page, |
| 1055 | + * comparing against the previous page's last row. |
| 1056 | + * |
| 1057 | + * <p>You can also use descending order, for example by passing {@code "NAME DESC"}. |
| 1058 | + * The generated paginated query will then use {@code "NAME" < ?} instead of {@code "NAME > ?} |
| 1059 | + * for the next page. |
| 1060 | + * |
| 1061 | + * <p>The standard SQL {@code FETCH NEXT ROWS} syntax is used for pagination. |
| 1062 | + * If {@code orderByKeys} is empty, the paginated query will use {@code OFFSET} keyword |
| 1063 | + * to specify the next page offset. |
| 1064 | + * |
| 1065 | + * <p>All column names are automatically double-quoted to avoid clashing with keywords or spaces |
| 1066 | + * in identifiers. If your DB is case sensitive, make sure you pass in the accurate case. |
| 1067 | + * |
| 1068 | + * <p>The method is lazy. Paginated queries are only sent when the returned stream is consumed. |
| 1069 | + * The caller should close the {@code connection} after done, which will close the cached |
| 1070 | + * JDBC statements used for pagination. |
| 1071 | + * |
| 1072 | + * @param orderByKeys for example {@code "firstName"} or {@code "group_id DESC"}. |
| 1073 | + * @param pageSize must be positive. |
| 1074 | + * @param rowMapper maps each row (of type {@code ResultSet}) to a new result object. |
| 1075 | + * @return a lazy stream of pages. If the query returns no rows, the stream will be empty. |
| 1076 | + * @throws IllegalArgumentException if {@code pageSize} isn't positive. |
| 1077 | + * @throws NullPointerException if any parameter is null. |
| 1078 | + * @since 9.9.3 |
| 1079 | + */ |
| 1080 | + public <T> Stream<List<T>> paginateLazily( |
| 1081 | + Connection connection, |
| 1082 | + Collection<String> orderByKeys, |
| 1083 | + int pageSize, |
| 1084 | + SqlFunction<? super ResultSet, ? extends T> rowMapper) { |
| 1085 | + return paginateLazily(connection, stmt -> {}, orderByKeys, pageSize, rowMapper); |
| 1086 | + } |
| 1087 | + |
| 1088 | + /** |
| 1089 | + * Paginate the encapsulated query by fetching up to {@code pageSize} rows for each page. |
| 1090 | + * |
| 1091 | + * <p>If {@code orderByKeys} is non-empty, the column values are used to control pagination. |
| 1092 | + * For example, if {@code [groupId, timestamp]} can uniquely identify a row, the generated |
| 1093 | + * SQL will use {@code "groupId" > ? OR ("groupId" = ? AND "timestamp" > ?)} for the next page, |
| 1094 | + * comparing against the previous page's last row. |
| 1095 | + * |
| 1096 | + * <p>You can also use descending order, for example by passing {@code "NAME DESC"}. |
| 1097 | + * The generated paginated query will then use {@code "NAME" < ?} instead of {@code "NAME > ?} |
| 1098 | + * for the next page. |
| 1099 | + * |
| 1100 | + * <p>The standard SQL {@code FETCH NEXT ROWS} syntax is used for pagination. |
| 1101 | + * If {@code orderByKeys} is empty, the paginated query will use {@code OFFSET} keyword |
| 1102 | + * to specify the next page offset. |
| 1103 | + * |
| 1104 | + * <p>All column names are automatically double-quoted to avoid clashing with keywords or spaces |
| 1105 | + * in identifiers. If your DB is case sensitive, make sure you pass in the accurate case. |
| 1106 | + * |
| 1107 | + * <p>The method is lazy. Paginated queries are only sent when the returned stream is consumed. |
| 1108 | + * The caller should close the {@code connection} after done, which will close the cached |
| 1109 | + * JDBC statements used for pagination. |
| 1110 | + * |
| 1111 | + * @param settings to configure the {@code PreparedStatement} used for the paginated queries. |
| 1112 | + * @param orderByKeys for example {@code "firstName"} or {@code "group_id DESC"}. |
| 1113 | + * @param pageSize must be positive. |
| 1114 | + * @param rowMapper maps each row (of type {@code ResultSet}) to a new result object. |
| 1115 | + * @return a lazy stream of pages. If the query returns no rows, the stream will be empty. |
| 1116 | + * @throws IllegalArgumentException if {@code pageSize} isn't positive. |
| 1117 | + * @throws NullPointerException if any parameter is null. |
| 1118 | + * @since 9.9.3 |
| 1119 | + */ |
| 1120 | + public <T> Stream<List<T>> paginateLazily( |
| 1121 | + Connection connection, |
| 1122 | + StatementSettings settings, |
| 1123 | + Collection<String> orderByKeys, |
| 1124 | + int pageSize, |
| 1125 | + SqlFunction<? super ResultSet, ? extends T> rowMapper) { |
| 1126 | + requireNonNull(connection); |
| 1127 | + requireNonNull(settings); |
| 1128 | + requireNonNull(rowMapper); |
| 1129 | + List<OrderBy> orderByList = orderByKeys.stream().collect(OrderBy.parsingToDistinctOrderByList()); |
| 1130 | + checkArgument(pageSize > 0, "pageSize (%s) isn't positive", pageSize); |
| 1131 | + class Pagination extends Iteration<List<T>> { |
| 1132 | + Map<String, Object> lastPageKeys = new HashMap<>(); |
| 1133 | + int offset = 0; |
| 1134 | + final Template<List<T>> fetch = prepare( |
| 1135 | + connection, |
| 1136 | + "SELECT * FROM ({this})" |
| 1137 | + + " WHERE {seek} {order_by? -> ORDER BY order_by?}" |
| 1138 | + + " OFFSET {offset} ROWS FETCH NEXT {page_size} ROWS ONLY", |
| 1139 | + stmt -> { |
| 1140 | + try (ResultSet resultSet = stmt.executeQuery()) { |
| 1141 | + List<T> results = mapResults(resultSet, row -> { |
| 1142 | + lastPageKeys = new LinkedHashMap<>(); |
| 1143 | + for (OrderBy orderBy : orderByList) { |
| 1144 | + Object columnValue = row.getObject(orderBy.columnName); |
| 1145 | + if (columnValue == null) { |
| 1146 | + throw new NullPointerException( |
| 1147 | + "null value not supported for order by column " + orderBy.columnName); |
| 1148 | + } |
| 1149 | + lastPageKeys.put(orderBy.columnName, columnValue); |
| 1150 | + } |
| 1151 | + return rowMapper.apply(row); |
| 1152 | + }); |
| 1153 | + if (orderByList.isEmpty()) { |
| 1154 | + // If order by is present, we always seek and start from offset 0. |
| 1155 | + offset += results.size(); |
| 1156 | + } |
| 1157 | + return results; |
| 1158 | + } |
| 1159 | + }); |
| 1160 | + |
| 1161 | + Pagination() { |
| 1162 | + lazily(this::fetchPage); |
| 1163 | + } |
| 1164 | + |
| 1165 | + void fetchPage() { |
| 1166 | + SafeSql seek = lastPageKeys.isEmpty() ? TRUE : OrderBy.after(lastPageKeys, orderByList); |
| 1167 | + List<T> page = fetch.with( |
| 1168 | + SafeSql.this, |
| 1169 | + seek, |
| 1170 | + orderByList.stream().map(OrderBy::sql).collect(toList()), |
| 1171 | + offset, |
| 1172 | + pageSize); |
| 1173 | + if (page.isEmpty()) { // no more data |
| 1174 | + return; |
| 1175 | + } |
| 1176 | + emit(page); |
| 1177 | + if (page.size() >= pageSize) { |
| 1178 | + lazily(this::fetchPage); |
| 1179 | + } |
| 1180 | + } |
| 1181 | + } |
| 1182 | + return new Pagination().iterate(); |
| 1183 | + } |
962 | 1184 |
|
963 | 1185 | /** |
964 | 1186 | * Executes the encapsulated SQL as a query against {@code connection}, |
|
0 commit comments