Skip to content

Commit 7614820

Browse files
committed
improve code
1 parent f81f123 commit 7614820

File tree

8 files changed

+48
-67
lines changed

8 files changed

+48
-67
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/table/getter/PartitionGetter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import java.util.List;
2525

26-
import static com.alibaba.fluss.metadata.ResolvedPartitionSpec.fromPartitionValue;
2726
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
2827
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
2928

@@ -62,7 +61,7 @@ public String getPartition(InternalRow row) {
6261
Object partitionValue = partitionFieldGetter.getFieldOrNull(row);
6362
checkNotNull(partitionValue, "Partition value shouldn't be null.");
6463
ResolvedPartitionSpec resolvedPartitionSpec =
65-
fromPartitionValue(partitionKey, partitionValue.toString());
64+
ResolvedPartitionSpec.fromPartitionValue(partitionKey, partitionValue.toString());
6665
return resolvedPartitionSpec.getPartitionName();
6766
}
6867
}

fluss-common/src/main/java/com/alibaba/fluss/metadata/ResolvedPartitionSpec.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@
2727
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
2828

2929
/**
30-
* Represents the resolved {@link PartitionSpec}, which the partition spec is re-arranged to the
31-
* correct order by comparing with a list of strictly ordered partition keys.
30+
* Represents a partition, which is the resolved version of {@link PartitionSpec}. The partition
31+
* spec is re-arranged into the correct order by comparing it with a list of strictly ordered
32+
* partition keys.
3233
*
3334
* @since 0.6
3435
*/
@@ -38,15 +39,13 @@ public class ResolvedPartitionSpec {
3839

3940
private final List<String> partitionKeys;
4041
private final List<String> partitionValues;
41-
private final String partitionName;
4242

4343
private ResolvedPartitionSpec(List<String> partitionKeys, List<String> partitionValues) {
4444
checkArgument(
4545
partitionKeys.size() == partitionValues.size(),
4646
"The number of partition keys and partition values should be the same.");
4747
this.partitionKeys = partitionKeys;
4848
this.partitionValues = partitionValues;
49-
this.partitionName = generatePartitionName();
5049
}
5150

5251
public static ResolvedPartitionSpec fromPartitionSpec(
@@ -74,10 +73,6 @@ public List<String> getPartitionValues() {
7473
return partitionValues;
7574
}
7675

77-
public String getPartitionName() {
78-
return partitionName;
79-
}
80-
8176
/**
8277
* Generate the partition name for a partition table of specify partition values.
8378
*
@@ -100,7 +95,7 @@ public String getPartitionName() {
10095
* <p>For example, if the partition keys are [a], and the partition value is [1], the partition
10196
* name will be "1".
10297
*/
103-
public String generatePartitionName() {
98+
public String getPartitionName() {
10499
return String.join(PARTITION_SPEC_SEPARATOR, partitionValues);
105100
}
106101

@@ -127,12 +122,7 @@ public String getPartitionQualifiedName() {
127122

128123
@Override
129124
public String toString() {
130-
return "ResolvedPartitionSpec{"
131-
+ "partitionKeys="
132-
+ partitionKeys
133-
+ ", partitionValues="
134-
+ partitionValues
135-
+ '}';
125+
return getPartitionQualifiedName();
136126
}
137127

138128
private static List<String> getReorderedPartitionValues(

fluss-common/src/main/java/com/alibaba/fluss/utils/PartitionUtils.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.List;
3232
import java.util.Map;
3333

34-
import static com.alibaba.fluss.metadata.ResolvedPartitionSpec.fromPartitionName;
3534
import static com.alibaba.fluss.metadata.TablePath.detectInvalidName;
3635

3736
/** Utils for partition. */
@@ -95,7 +94,7 @@ static void validatePartitionValues(List<String> partitionValues) {
9594
* @param timeUnit the time unit
9695
* @return the resolved partition spec
9796
*/
98-
public static ResolvedPartitionSpec genAutoPartitionResolvedPartitionSpec(
97+
public static ResolvedPartitionSpec generateAutoPartition(
9998
List<String> partitionKeys,
10099
ZonedDateTime current,
101100
int offset,
@@ -122,7 +121,7 @@ public static ResolvedPartitionSpec genAutoPartitionResolvedPartitionSpec(
122121
throw new IllegalArgumentException("Unsupported time unit: " + timeUnit);
123122
}
124123

125-
return fromPartitionName(partitionKeys, autoPartitionFieldSpec);
124+
return ResolvedPartitionSpec.fromPartitionName(partitionKeys, autoPartitionFieldSpec);
126125
}
127126

128127
private static String getFormattedTime(ZonedDateTime zonedDateTime, String format) {

fluss-common/src/test/java/com/alibaba/fluss/utils/PartitionUtilsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA;
3636
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
37-
import static com.alibaba.fluss.utils.PartitionUtils.genAutoPartitionResolvedPartitionSpec;
37+
import static com.alibaba.fluss.utils.PartitionUtils.generateAutoPartition;
3838
import static com.alibaba.fluss.utils.PartitionUtils.validatePartitionSpec;
3939
import static com.alibaba.fluss.utils.PartitionUtils.validatePartitionValues;
4040
import static org.assertj.core.api.Assertions.assertThat;
@@ -137,7 +137,7 @@ void testGenerateAutoPartitionName(
137137
String[] expected) {
138138
for (int i = 0; i < offsets.length; i++) {
139139
ResolvedPartitionSpec resolvedPartitionSpec =
140-
genAutoPartitionResolvedPartitionSpec(
140+
generateAutoPartition(
141141
Collections.singletonList("dt"),
142142
zonedDateTime,
143143
offsets[i],

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@
5757
import java.util.concurrent.locks.Lock;
5858
import java.util.concurrent.locks.ReentrantLock;
5959

60-
import static com.alibaba.fluss.metadata.ResolvedPartitionSpec.fromPartitionName;
61-
import static com.alibaba.fluss.utils.PartitionUtils.genAutoPartitionResolvedPartitionSpec;
60+
import static com.alibaba.fluss.utils.PartitionUtils.generateAutoPartition;
6261
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
6362
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
6463

@@ -232,7 +231,7 @@ private void createPartitions(
232231
}
233232

234233
TablePath tablePath = tableInfo.getTablePath();
235-
for (ResolvedPartitionSpec resolvedPartitionSpec : partitionsToPreCreate) {
234+
for (ResolvedPartitionSpec partition : partitionsToPreCreate) {
236235
long tableId = tableInfo.getTableId();
237236
int replicaFactor = tableInfo.getTableConfig().getReplicationFactor();
238237
int[] servers = metadataCache.getLiveServerIds();
@@ -245,20 +244,17 @@ private void createPartitions(
245244

246245
try {
247246
metadataManager.createPartition(
248-
tablePath, tableId, partitionAssignment, resolvedPartitionSpec, false);
247+
tablePath, tableId, partitionAssignment, partition, false);
249248
} catch (PartitionAlreadyExistsException e) {
250-
LOG.error(
251-
"Partition {} already exists for table {} when auto create partitions.",
252-
resolvedPartitionSpec.getPartitionQualifiedName(),
253-
tablePath,
254-
e);
249+
LOG.info(
250+
"Auto partitioning skip to create partition {} for table [{}] as the partition is exist.",
251+
partition,
252+
tablePath);
255253
}
256254

257-
currentPartitions.add(resolvedPartitionSpec.getPartitionName());
255+
currentPartitions.add(partition.getPartitionName());
258256
LOG.info(
259-
"Auto partitioning created partition {} for table [{}].",
260-
resolvedPartitionSpec.getPartitionQualifiedName(),
261-
tablePath);
257+
"Auto partitioning created partition {} for table [{}].", partition, tablePath);
262258
}
263259
}
264260

@@ -275,12 +271,12 @@ private List<ResolvedPartitionSpec> partitionNamesToPreCreate(
275271
int partitionToPreCreate = autoPartitionStrategy.numPreCreate();
276272
List<ResolvedPartitionSpec> partitionsToCreate = new ArrayList<>();
277273
for (int idx = 0; idx < partitionToPreCreate; idx++) {
278-
ResolvedPartitionSpec resolvedPartitionSpec =
279-
genAutoPartitionResolvedPartitionSpec(
274+
ResolvedPartitionSpec partition =
275+
generateAutoPartition(
280276
partitionKeys, currentZonedDateTime, idx, autoPartitionTimeUnit);
281277
// if the partition already exists, we don't need to create it, otherwise, create it
282-
if (!currentPartitions.contains(resolvedPartitionSpec.getPartitionName())) {
283-
partitionsToCreate.add(resolvedPartitionSpec);
278+
if (!currentPartitions.contains(partition.getPartitionName())) {
279+
partitionsToCreate.add(partition);
284280
}
285281
}
286282
return partitionsToCreate;
@@ -303,30 +299,29 @@ private void dropPartitions(
303299
currentInstant, autoPartitionStrategy.timeZone().toZoneId());
304300

305301
// get the earliest one partition that need to retain
306-
ResolvedPartitionSpec lastRetainResolvedPartitionSpec =
307-
genAutoPartitionResolvedPartitionSpec(
302+
ResolvedPartitionSpec lastRetainPartition =
303+
generateAutoPartition(
308304
partitionKeys,
309305
currentZonedDateTime,
310306
-numToRetain,
311307
autoPartitionStrategy.timeUnit());
312308

313309
Iterator<String> partitionsToExpire =
314-
currentPartitions
315-
.headSet(lastRetainResolvedPartitionSpec.getPartitionName(), false)
316-
.iterator();
310+
currentPartitions.headSet(lastRetainPartition.getPartitionName(), false).iterator();
317311

318312
while (partitionsToExpire.hasNext()) {
319313
String partitionName = partitionsToExpire.next();
320314
// drop the partition
321315
try {
322316
metadataManager.dropPartition(
323-
tablePath, fromPartitionName(partitionKeys, partitionName), false);
317+
tablePath,
318+
ResolvedPartitionSpec.fromPartitionName(partitionKeys, partitionName),
319+
false);
324320
} catch (PartitionNotExistException e) {
325-
LOG.error(
326-
"Partition {} does not exist for table {} when auto drop partitions.",
321+
LOG.info(
322+
"Auto partitioning skip to delete partition {} for table [{}] as the partition is not exist.",
327323
partitionName,
328-
tablePath,
329-
e);
324+
tablePath);
330325
}
331326

332327
// only remove when zk success, this reflects to the partitionsByTable

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676
import java.util.concurrent.CompletableFuture;
7777
import java.util.function.Supplier;
7878

79-
import static com.alibaba.fluss.metadata.ResolvedPartitionSpec.fromPartitionSpec;
8079
import static com.alibaba.fluss.server.utils.RpcMessageUtils.getCommitLakeTableSnapshotData;
8180
import static com.alibaba.fluss.server.utils.RpcMessageUtils.getPartitionSpec;
8281
import static com.alibaba.fluss.server.utils.RpcMessageUtils.toTablePath;
@@ -250,8 +249,9 @@ public CompletableFuture<CreatePartitionResponse> createPartition(
250249
// first, validate the partition spec, and get resolved partition spec.
251250
PartitionSpec partitionSpec = getPartitionSpec(request.getPartitionSpec());
252251
validatePartitionSpec(tableInfo, partitionSpec);
253-
ResolvedPartitionSpec resolvedPartitionSpec =
254-
fromPartitionSpec(tableInfo.getPartitionKeys(), partitionSpec);
252+
ResolvedPartitionSpec partitionToCreate =
253+
ResolvedPartitionSpec.fromPartitionSpec(
254+
tableInfo.getPartitionKeys(), partitionSpec);
255255

256256
// second, generate the PartitionAssignment.
257257
int replicaFactor = tableInfo.getTableConfig().getReplicationFactor();
@@ -267,7 +267,7 @@ public CompletableFuture<CreatePartitionResponse> createPartition(
267267
tableInfo.getTablePath(),
268268
tableInfo.getTableId(),
269269
partitionAssignment,
270-
resolvedPartitionSpec,
270+
partitionToCreate,
271271
request.isIgnoreIfNotExists());
272272
return CompletableFuture.completedFuture(response);
273273
}
@@ -285,11 +285,11 @@ public CompletableFuture<DropPartitionResponse> dropPartition(DropPartitionReque
285285
// first, validate the partition spec.
286286
PartitionSpec partitionSpec = getPartitionSpec(request.getPartitionSpec());
287287
validatePartitionSpec(tableInfo, partitionSpec);
288-
ResolvedPartitionSpec resolvedPartitionSpec =
289-
fromPartitionSpec(tableInfo.getPartitionKeys(), partitionSpec);
288+
ResolvedPartitionSpec partitionToDrop =
289+
ResolvedPartitionSpec.fromPartitionSpec(
290+
tableInfo.getPartitionKeys(), partitionSpec);
290291

291-
metadataManager.dropPartition(
292-
tablePath, resolvedPartitionSpec, request.isIgnoreIfNotExists());
292+
metadataManager.dropPartition(tablePath, partitionToDrop, request.isIgnoreIfNotExists());
293293
return CompletableFuture.completedFuture(response);
294294
}
295295

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -324,9 +324,9 @@ public void createPartition(
324324
TablePath tablePath,
325325
long tableId,
326326
PartitionAssignment partitionAssignment,
327-
ResolvedPartitionSpec resolvedPartitionSpec,
327+
ResolvedPartitionSpec partition,
328328
boolean ignoreIfExists) {
329-
String partitionName = resolvedPartitionSpec.getPartitionName();
329+
String partitionName = partition.getPartitionName();
330330
Optional<TablePartition> optionalTablePartition =
331331
getOptionalTablePartition(tablePath, partitionName);
332332
if (optionalTablePartition.isPresent()) {
@@ -336,7 +336,7 @@ public void createPartition(
336336
throw new PartitionAlreadyExistsException(
337337
String.format(
338338
"Partition '%s' already exists for table %s",
339-
resolvedPartitionSpec.getPartitionQualifiedName(), tablePath));
339+
partition.getPartitionQualifiedName(), tablePath));
340340
}
341341

342342
try {
@@ -357,10 +357,8 @@ public void createPartition(
357357
}
358358

359359
public void dropPartition(
360-
TablePath tablePath,
361-
ResolvedPartitionSpec resolvedPartitionSpec,
362-
boolean ignoreIfNotExists) {
363-
String partitionName = resolvedPartitionSpec.getPartitionName();
360+
TablePath tablePath, ResolvedPartitionSpec partition, boolean ignoreIfNotExists) {
361+
String partitionName = partition.getPartitionName();
364362
Optional<TablePartition> optionalTablePartition =
365363
getOptionalTablePartition(tablePath, partitionName);
366364
if (!optionalTablePartition.isPresent()) {
@@ -371,7 +369,7 @@ public void dropPartition(
371369
throw new PartitionNotExistException(
372370
String.format(
373371
"Partition '%s' does not exist for table %s",
374-
resolvedPartitionSpec.getPartitionQualifiedName(), tablePath));
372+
partition.getPartitionQualifiedName(), tablePath));
375373
}
376374

377375
try {

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
import static com.alibaba.fluss.server.utils.RpcMessageUtils.toTablePath;
8989
import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
9090
import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue;
91-
import static com.alibaba.fluss.utils.PartitionUtils.genAutoPartitionResolvedPartitionSpec;
91+
import static com.alibaba.fluss.utils.PartitionUtils.generateAutoPartition;
9292
import static org.assertj.core.api.Assertions.assertThat;
9393
import static org.assertj.core.api.Assertions.assertThatThrownBy;
9494

@@ -605,7 +605,7 @@ public static List<String> getExpectAddedPartitions(
605605
List<String> partitions = new ArrayList<>();
606606
for (int i = 0; i < newPartitions; i++) {
607607
partitions.add(
608-
genAutoPartitionResolvedPartitionSpec(partitionKeys, addDateTime, i, timeUnit)
608+
generateAutoPartition(partitionKeys, addDateTime, i, timeUnit)
609609
.getPartitionName());
610610
}
611611
return partitions;

0 commit comments

Comments
 (0)