|
35 | 35 | import org.apache.fluss.metadata.TableInfo; |
36 | 36 | import org.apache.fluss.metadata.TablePath; |
37 | 37 | import org.apache.fluss.types.RowType; |
| 38 | +import java.util.concurrent.ExecutionException; |
38 | 39 |
|
39 | 40 | import javax.annotation.Nullable; |
40 | 41 |
|
@@ -79,7 +80,7 @@ public Scan project(List<String> projectedColumnNames) { |
79 | 80 | int index = rowType.getFieldIndex(projectedColumnNames.get(i)); |
80 | 81 | if (index < 0) { |
81 | 82 | throw new IllegalArgumentException( |
82 | | - "Field " + projectedColumnNames.get(i) + " not found in table schema."); |
| 83 | + String.format("Field %s not found in table schema.", projectedColumnNames.get(i))); |
83 | 84 | } |
84 | 85 | columnIndexes[i] = index; |
85 | 86 | } |
@@ -119,16 +120,42 @@ public BatchScanner createBatchScanner(TableBucket tableBucket) { |
119 | 120 | public BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId) { |
120 | 121 | if (limit != null) { |
121 | 122 | throw new UnsupportedOperationException( |
122 | | - "Currently, SnapshotBatchScanner doesn't support limit pushdown."); |
| 123 | + String.format( |
| 124 | + "SnapshotBatchScanner does not support limit pushdown. Received limit=%s. Remove limit() from the scan or use createBatchScanner(TableBucket) for limited reads.", |
| 125 | + limit)); |
| 126 | + } |
| 127 | + if (!tableInfo.hasPrimaryKey()) { |
| 128 | + throw new UnsupportedOperationException( |
| 129 | + String.format( |
| 130 | + "Snapshot-based batch scanning is only supported for Primary Key tables. Table %s does not have a primary key.", |
| 131 | + tableInfo.getTablePath())); |
123 | 132 | } |
124 | 133 | String scannerTmpDir = |
125 | 134 | conn.getConfiguration().getString(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR); |
126 | 135 | Admin admin = conn.getAdmin(); |
127 | 136 | final KvSnapshotMetadata snapshotMeta; |
128 | 137 | try { |
129 | 138 | snapshotMeta = admin.getKvSnapshotMetadata(tableBucket, snapshotId).get(); |
| 139 | + } catch (InterruptedException ie) { |
| 140 | + Thread.currentThread().interrupt(); |
| 141 | + throw new FlussRuntimeException( |
| 142 | + String.format( |
| 143 | + "Interrupted while fetching snapshot metadata for table %s, %s, snapshotId=%d", |
| 144 | + tableInfo.getTablePath(), tableBucket, snapshotId), |
| 145 | + ie); |
| 146 | + } catch (ExecutionException ee) { |
| 147 | + Throwable cause = ee.getCause() == null ? ee : ee.getCause(); |
| 148 | + throw new FlussRuntimeException( |
| 149 | + String.format( |
| 150 | + "Failed to get snapshot metadata for table %s, %s, snapshotId=%d: %s", |
| 151 | + tableInfo.getTablePath(), tableBucket, snapshotId, cause.getMessage()), |
| 152 | + cause); |
130 | 153 | } catch (Exception e) { |
131 | | - throw new FlussRuntimeException("Failed to get snapshot metadata", e); |
| 154 | + throw new FlussRuntimeException( |
| 155 | + String.format( |
| 156 | + "Unexpected error while getting snapshot metadata for table %s, %s, snapshotId=%d", |
| 157 | + tableInfo.getTablePath(), tableBucket, snapshotId), |
| 158 | + e); |
132 | 159 | } |
133 | 160 |
|
134 | 161 | return new KvSnapshotBatchScanner( |
@@ -170,7 +197,10 @@ public BatchScanner createBatchScanner(String partitionName) { |
170 | 197 | } |
171 | 198 | Long pid = conn.getMetadataUpdater().getPartitionId(physical).orElse(null); |
172 | 199 | if (pid == null) { |
173 | | - throw new IllegalStateException("Partition id not found for " + partitionName); |
| 200 | + throw new IllegalStateException( |
| 201 | + String.format( |
| 202 | + "Partition ID not found for partition '%s' in table %s. Metadata may be stale. Try refreshing metadata and ensure the partition exists.", |
| 203 | + partitionName, tableInfo.getTablePath())); |
174 | 204 | } |
175 | 205 | return new FullScanBatchScanner(tableInfo, conn.getMetadataUpdater(), pid); |
176 | 206 | } |
@@ -202,7 +232,7 @@ public BatchScanner createFullScanBatchScanner(String partitionName) { |
202 | 232 | } |
203 | 233 | Long pid = conn.getMetadataUpdater().getPartitionId(physical).orElse(null); |
204 | 234 | if (pid == null) { |
205 | | - throw new IllegalStateException("Partition id not found for " + partitionName); |
| 235 | + throw new IllegalStateException(String.format("Partition id not found for %s", partitionName)); |
206 | 236 | } |
207 | 237 | return new FullScanBatchScanner(tableInfo, conn.getMetadataUpdater(), pid); |
208 | 238 | } |
|
0 commit comments