Skip to content

Commit 136ca3b

Browse files
author
Rahil Chertara
committed
get rebase to compile
1 parent 21d0fa3 commit 136ca3b

11 files changed

+158
-110
lines changed

core/src/main/java/org/apache/iceberg/ContentFileParser.java

-49
Original file line numberDiff line numberDiff line change
@@ -274,55 +274,6 @@ static PartitionData partitionDataFromRawValue(JsonNode rawPartitionValue, Parti
274274
return partitionData;
275275
}
276276

277-
static void unboundContentFileToJson(
278-
ContentFile<?> contentFile, PartitionSpec spec, JsonGenerator generator) throws IOException {
279-
Preconditions.checkArgument(contentFile != null, "Invalid content file: null");
280-
Preconditions.checkArgument(spec != null, "Invalid partition spec: null");
281-
Preconditions.checkArgument(generator != null, "Invalid JSON generator: null");
282-
Preconditions.checkArgument(
283-
contentFile.specId() == spec.specId(),
284-
"Invalid partition spec id from content file: expected = %s, actual = %s",
285-
spec.specId(),
286-
contentFile.specId());
287-
288-
generator.writeStartObject();
289-
// ignore the ordinal position (ContentFile#pos) of the file in a manifest,
290-
// as it isn't used and BaseFile constructor doesn't support it.
291-
292-
generator.writeNumberField(SPEC_ID, contentFile.specId());
293-
generator.writeStringField(CONTENT, contentFile.content().name());
294-
generator.writeStringField(FILE_PATH, contentFile.path().toString());
295-
generator.writeStringField(FILE_FORMAT, contentFile.format().name());
296-
297-
if (contentFile.partition() != null) {
298-
generator.writeFieldName(PARTITION);
299-
SingleValueParser.toJson(spec.partitionType(), contentFile.partition(), generator);
300-
}
301-
302-
generator.writeNumberField(FILE_SIZE, contentFile.fileSizeInBytes());
303-
304-
metricsToJson(contentFile, generator);
305-
306-
if (contentFile.keyMetadata() != null) {
307-
generator.writeFieldName(KEY_METADATA);
308-
SingleValueParser.toJson(DataFile.KEY_METADATA.type(), contentFile.keyMetadata(), generator);
309-
}
310-
311-
if (contentFile.splitOffsets() != null) {
312-
JsonUtil.writeLongArray(SPLIT_OFFSETS, contentFile.splitOffsets(), generator);
313-
}
314-
315-
if (contentFile.equalityFieldIds() != null) {
316-
JsonUtil.writeIntegerArray(EQUALITY_IDS, contentFile.equalityFieldIds(), generator);
317-
}
318-
319-
if (contentFile.sortOrderId() != null) {
320-
generator.writeNumberField(SORT_ORDER_ID, contentFile.sortOrderId());
321-
}
322-
323-
generator.writeEndObject();
324-
}
325-
326277
private static void metricsToJson(ContentFile<?> contentFile, JsonGenerator generator)
327278
throws IOException {
328279
generator.writeNumberField(RECORD_COUNT, contentFile.recordCount());

core/src/main/java/org/apache/iceberg/UnboundBaseFileScanTask.java

+71-9
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,15 @@
2020

2121
import org.apache.iceberg.expressions.Expression;
2222
import org.apache.iceberg.expressions.ResidualEvaluator;
23+
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
2324

24-
class UnboundBaseFileScanTask extends BaseFileScanTask {
25-
private UnboundGenericDataFile unboundDataFile;
26-
private UnboundGenericDeleteFile[] unboundDeleteFiles;
25+
public class UnboundBaseFileScanTask extends BaseFileScanTask {
26+
private DataFile unboundDataFile;
27+
private DeleteFile[] unboundDeleteFiles;
2728
private Expression filter;
2829

29-
UnboundBaseFileScanTask(
30-
UnboundGenericDataFile unboundDataFile,
31-
UnboundGenericDeleteFile[] unboundDeleteFiles,
32-
Expression filter) {
30+
public UnboundBaseFileScanTask(
31+
DataFile unboundDataFile, DeleteFile[] unboundDeleteFiles, Expression filter) {
3332
super(unboundDataFile, unboundDeleteFiles, null, null, ResidualEvaluator.unpartitioned(filter));
3433
this.unboundDataFile = unboundDataFile;
3534
this.unboundDeleteFiles = unboundDeleteFiles;
@@ -46,11 +45,74 @@ public PartitionSpec spec() {
4645
throw new UnsupportedOperationException("spec() is not supported in UnboundBaseFileScanTask");
4746
}
4847

48+
@Override
49+
public String toString() {
50+
return MoreObjects.toStringHelper(this)
51+
.add("unboundDataFile", unboundDataFile)
52+
.add("unboundDeleteFiles", unboundDeleteFiles)
53+
.add("filter", filter)
54+
.toString();
55+
}
56+
4957
public FileScanTask bind(PartitionSpec spec, boolean caseSensitive) {
50-
GenericDataFile boundDataFile = unboundDataFile.bindToSpec(spec);
58+
// TODO before creating a new task
59+
// need to ensure that dataFile is refreshed with correct partitionData using spec
60+
// need to ensure deleteFiles is refreshed with spec info
61+
// need to ensure residual refreshed with spec.
62+
63+
Metrics dataFileMetrics =
64+
new Metrics(
65+
unboundDataFile.recordCount(),
66+
unboundDataFile.columnSizes(),
67+
unboundDataFile.valueCounts(),
68+
unboundDataFile.nullValueCounts(),
69+
unboundDataFile.nanValueCounts());
70+
PartitionData partitionData = new PartitionData(spec.partitionType());
71+
72+
GenericDataFile boundDataFile =
73+
new GenericDataFile(
74+
spec.specId(),
75+
(String) unboundDataFile.path(),
76+
unboundDataFile.format(),
77+
partitionData,
78+
unboundDataFile.fileSizeInBytes(),
79+
dataFileMetrics,
80+
unboundDataFile.keyMetadata(),
81+
unboundDataFile.splitOffsets(),
82+
unboundDataFile.sortOrderId());
83+
5184
DeleteFile[] boundDeleteFiles = new DeleteFile[unboundDeleteFiles.length];
5285
for (int i = 0; i < unboundDeleteFiles.length; i++) {
53-
boundDeleteFiles[i] = unboundDeleteFiles[i].bindToSpec(spec);
86+
DeleteFile deleteFile = unboundDeleteFiles[i];
87+
Metrics deleteFileMetrics =
88+
new Metrics(
89+
deleteFile.recordCount(),
90+
deleteFile.columnSizes(),
91+
deleteFile.valueCounts(),
92+
deleteFile.nullValueCounts(),
93+
deleteFile.nanValueCounts());
94+
95+
int[] equalityDeletes = null;
96+
if (deleteFile.equalityFieldIds() != null) {
97+
equalityDeletes =
98+
deleteFile.equalityFieldIds().stream().mapToInt(Integer::intValue).toArray();
99+
}
100+
101+
DeleteFile genericDeleteFile =
102+
new GenericDeleteFile(
103+
spec.specId(),
104+
deleteFile.content(),
105+
(String) deleteFile.path(),
106+
deleteFile.format(),
107+
partitionData,
108+
deleteFile.fileSizeInBytes(),
109+
deleteFileMetrics,
110+
equalityDeletes,
111+
deleteFile.sortOrderId(),
112+
deleteFile.splitOffsets(),
113+
deleteFile.keyMetadata());
114+
115+
boundDeleteFiles[i] = genericDeleteFile;
54116
}
55117

56118
String schemaString = SchemaParser.toJson(spec.schema());

core/src/main/java/org/apache/iceberg/rest/responses/FetchPlanningResultResponse.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,8 @@
2525
import org.apache.iceberg.PartitionSpec;
2626
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2727
import org.apache.iceberg.rest.PlanStatus;
28-
import org.apache.iceberg.rest.RESTResponse;
2928

30-
public class FetchPlanningResultResponse implements RESTResponse {
29+
public class FetchPlanningResultResponse implements TableScanResponse {
3130
private final PlanStatus planStatus;
3231
private final List<String> planTasks;
3332
private final List<FileScanTask> fileScanTasks;

core/src/main/java/org/apache/iceberg/rest/responses/FetchScanTasksResponse.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@
2424
import org.apache.iceberg.FileScanTask;
2525
import org.apache.iceberg.PartitionSpec;
2626
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
27-
import org.apache.iceberg.rest.RESTResponse;
2827

29-
public class FetchScanTasksResponse implements RESTResponse {
28+
public class FetchScanTasksResponse implements TableScanResponse {
3029
private final List<String> planTasks;
3130
private final List<FileScanTask> fileScanTasks;
3231
private final List<DeleteFile> deleteFiles;

core/src/main/java/org/apache/iceberg/rest/responses/PlanTableScanResponse.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
import org.apache.iceberg.PartitionSpec;
2626
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
2727
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
28+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
2829
import org.apache.iceberg.rest.PlanStatus;
29-
import org.apache.iceberg.rest.RESTResponse;
3030

31-
public class PlanTableScanResponse implements RESTResponse {
31+
public class PlanTableScanResponse implements TableScanResponse {
3232
private final PlanStatus planStatus;
3333
private final String planId;
3434
private final List<String> planTasks;
@@ -48,8 +48,7 @@ private PlanTableScanResponse(
4848
this.planTasks = planTasks;
4949
this.fileScanTasks = fileScanTasks;
5050
this.deleteFiles = deleteFiles;
51-
this.specsById = specsById;
52-
validate();
51+
this.specsById = specsById != null ? specsById : Maps.newHashMap();
5352
}
5453

5554
public PlanStatus planStatus() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.rest.responses;
20+
21+
import org.apache.iceberg.rest.RESTResponse;
22+
23+
public interface TableScanResponse extends RESTResponse {}

core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java

+14-11
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,9 @@ public <T extends RESTResponse> T handleRequest(
536536
{
537537
TableIdentifier ident = tableIdentFromPathVars(vars);
538538
PlanTableScanRequest request = castRequest(PlanTableScanRequest.class, body);
539-
TableScan tableScan = catalog.loadTable(ident).newScan();
539+
Table table = catalog.loadTable(ident);
540+
TableScan tableScan = table.newScan();
541+
540542
if (request.snapshotId() != null) {
541543
tableScan.useSnapshot(request.snapshotId());
542544
}
@@ -558,9 +560,10 @@ public <T extends RESTResponse> T handleRequest(
558560
if (ident.equals(TABLE_COMPLETED_WITH_FILE_SCAN_TASK)) {
559561
return castResponse(
560562
responseType,
561-
new PlanTableScanResponse.Builder()
563+
PlanTableScanResponse.builder()
562564
.withPlanStatus(PlanStatus.COMPLETED)
563565
.withFileScanTasks(fileScanTasks)
566+
.withSpecsById(table.specs())
564567
.build());
565568
}
566569

@@ -571,9 +574,10 @@ public <T extends RESTResponse> T handleRequest(
571574
planToFileScanTasks.put(planId, fileScanTasks);
572575
return castResponse(
573576
responseType,
574-
new PlanTableScanResponse.Builder()
577+
PlanTableScanResponse.builder()
575578
.withPlanId(planId)
576579
.withPlanStatus(PlanStatus.SUBMITTED)
580+
// .withSpecsById(table.specs())
577581
.build());
578582
}
579583

@@ -585,9 +589,10 @@ public <T extends RESTResponse> T handleRequest(
585589
planTasks.forEach(task -> planToFileScanTasks.put(task, fileScanTasks));
586590
return castResponse(
587591
responseType,
588-
new PlanTableScanResponse.Builder()
592+
PlanTableScanResponse.builder()
589593
.withPlanStatus(PlanStatus.COMPLETED)
590594
.withPlanTasks(planTasks)
595+
// .withSpecsById(table.specs())
591596
.build());
592597
}
593598

@@ -608,7 +613,7 @@ public <T extends RESTResponse> T handleRequest(
608613

609614
return castResponse(
610615
responseType,
611-
new PlanTableScanResponse.Builder()
616+
PlanTableScanResponse.builder()
612617
.withPlanStatus(PlanStatus.COMPLETED)
613618
.withPlanTasks(outerPlanTasks)
614619
.build());
@@ -623,7 +628,7 @@ public <T extends RESTResponse> T handleRequest(
623628
String planId = planIDFromPathVars(vars);
624629
return castResponse(
625630
responseType,
626-
new FetchPlanningResultResponse.Builder()
631+
FetchPlanningResultResponse.builder()
627632
.withPlanStatus(PlanStatus.fromName("completed"))
628633
.withFileScanTasks(planToFileScanTasks.get(planId))
629634
.build());
@@ -638,7 +643,7 @@ public <T extends RESTResponse> T handleRequest(
638643
if (ident.equals(TABLE_COMPLETED_WITH_PLAN_TASK)) {
639644
return castResponse(
640645
responseType,
641-
new FetchScanTasksResponse.Builder()
646+
FetchScanTasksResponse.builder()
642647
.withFileScanTasks(planToFileScanTasks.get(request.planTask()))
643648
.build());
644649
}
@@ -649,9 +654,7 @@ public <T extends RESTResponse> T handleRequest(
649654
String innerPlanTask = planToPlanTasks.remove(request.planTask());
650655
return castResponse(
651656
responseType,
652-
new FetchScanTasksResponse.Builder()
653-
.withPlanTasks(List.of(innerPlanTask))
654-
.build());
657+
FetchScanTasksResponse.builder().withPlanTasks(List.of(innerPlanTask)).build());
655658
}
656659

657660
if (planToFileScanTasks.containsKey(request.planTask())) {
@@ -660,7 +663,7 @@ public <T extends RESTResponse> T handleRequest(
660663
planToFileScanTasks.remove(request.planTask());
661664
return castResponse(
662665
responseType,
663-
new FetchScanTasksResponse.Builder()
666+
FetchScanTasksResponse.builder()
664667
.withFileScanTasks(fileScanTasksFromPlanTask)
665668
.build());
666669
}

0 commit comments

Comments
 (0)