diff --git a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java index 0990756ce5..ca1752bca6 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java @@ -214,6 +214,9 @@ public void close() throws Exception { securityTokenManager.stop(); } + // Close metadata updater to release scheduler and pending requests + metadataUpdater.close(); + clientMetricGroup.close(); rpcClient.close(); metricRegistry.closeAsync().get(); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java index 6c32647636..4f5bad3ecb 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java @@ -28,6 +28,8 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.encode.KeyEncoder; import org.apache.fluss.types.RowType; +import org.apache.fluss.utils.ExceptionUtils; +import org.apache.fluss.utils.concurrent.FutureUtils; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -35,7 +37,7 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; -import static org.apache.fluss.client.utils.ClientUtils.getPartitionId; +import static org.apache.fluss.client.utils.ClientUtils.getPartitionIdAsync; import static org.apache.fluss.utils.Preconditions.checkArgument; /** An implementation of {@link Lookuper} that lookups by primary key. */ @@ -89,44 +91,68 @@ public PrimaryKeyLookuper( @Override public CompletableFuture lookup(InternalRow lookupKey) { - // encoding the key row using a compacted way consisted with how the key is encoded when put - // a row - byte[] pkBytes = primaryKeyEncoder.encodeKey(lookupKey); - byte[] bkBytes = - bucketKeyEncoder == primaryKeyEncoder - ? pkBytes - : bucketKeyEncoder.encodeKey(lookupKey); - Long partitionId = null; - if (partitionGetter != null) { - try { - partitionId = - getPartitionId( + try { + // encoding the key row using a compacted way consisted with how the key is encoded when + // put a row + byte[] pkBytes = primaryKeyEncoder.encodeKey(lookupKey); + byte[] bkBytes = + bucketKeyEncoder == primaryKeyEncoder + ? pkBytes + : bucketKeyEncoder.encodeKey(lookupKey); + + // If partition getter is present, we need to get partition ID asynchronously + if (partitionGetter != null) { + // Use async version to avoid blocking Netty IO threads + return getPartitionIdAsync( lookupKey, partitionGetter, tableInfo.getTablePath(), - metadataUpdater); - } catch (PartitionNotExistException e) { - return CompletableFuture.completedFuture(new LookupResult(Collections.emptyList())); + metadataUpdater) + .thenCompose(partitionId -> performLookup(partitionId, bkBytes, pkBytes)) + .exceptionally( + throwable -> { + // Handle partition not exist exception by returning null result + if (ExceptionUtils.findThrowable( + throwable, PartitionNotExistException.class) + .isPresent()) { + return new LookupResult((InternalRow) null); + } + // Re-throw other exceptions + throw new RuntimeException(throwable); + }); + } else { + // No partition, directly perform lookup + return performLookup(null, bkBytes, pkBytes); } + } catch (Exception e) { + return FutureUtils.failedCompletableFuture(e); } + } + /** + * Perform the actual lookup operation and process the result. + * + * @param partitionId the partition ID, or null if the table is not partitioned + * @param bkBytes the encoded bucket key bytes + * @param pkBytes the encoded primary key bytes + * @return a CompletableFuture containing the lookup result + */ + private CompletableFuture performLookup( + @Nullable Long partitionId, byte[] bkBytes, byte[] pkBytes) { int bucketId = bucketingFunction.bucketing(bkBytes, numBuckets); TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId); - CompletableFuture lookupFuture = new CompletableFuture<>(); - lookupClient + return lookupClient .lookup(tableInfo.getTablePath(), tableBucket, pkBytes) - .whenComplete( - (result, error) -> { - if (error != null) { - lookupFuture.completeExceptionally(error); - } else { - handleLookupResponse( - result == null - ? Collections.emptyList() - : Collections.singletonList(result), - lookupFuture); - } + .thenCompose( + result -> { + CompletableFuture resultFuture = + new CompletableFuture<>(); + handleLookupResponse( + result == null + ? Collections.emptyList() + : Collections.singletonList(result), + resultFuture); + return resultFuture; }); - return lookupFuture; } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClusterMetadataFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClusterMetadataFetcher.java new file mode 100644 index 0000000000..db2e1c49fe --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/metadata/ClusterMetadataFetcher.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.metadata; + +import org.apache.fluss.cluster.Cluster; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * An abstraction for fetching metadata and rebuilding cluster. This interface allows dependency + * injection for testing purposes. + */ +@FunctionalInterface +public interface ClusterMetadataFetcher { + + /** + * Fetch metadata and rebuild cluster asynchronously. + * + * @param gateway the gateway to send request + * @param partialUpdate whether to perform partial update (merge with existing cluster) + * @param originCluster the original cluster to merge with (if partial update) + * @param tablePaths tables to request metadata for + * @param tablePartitions partitions to request metadata for + * @param tablePartitionIds partition ids to request metadata for + * @return a future that completes with the new cluster + */ + CompletableFuture fetch( + AdminReadOnlyGateway gateway, + boolean partialUpdate, + Cluster originCluster, + @Nullable Set tablePaths, + @Nullable Collection tablePartitions, + @Nullable Collection tablePartitionIds); +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java index 5cb7140445..6b0ab839b4 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java @@ -19,6 +19,7 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.client.utils.ClientUtils; +import org.apache.fluss.client.utils.MetadataUtils; import org.apache.fluss.cluster.BucketLocation; import org.apache.fluss.cluster.Cluster; import org.apache.fluss.cluster.ServerNode; @@ -39,73 +40,240 @@ import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.utils.ExceptionUtils; +import org.apache.fluss.utils.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; -import static org.apache.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode; import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster; import static org.apache.fluss.utils.ExceptionUtils.stripExecutionException; +import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; +import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; -/** The updater to initialize and update client metadata. */ +/** + * The updater to initialize and update client metadata. + * + *

This class manages metadata updates with batching and deduplication to avoid request storms. + * Inspired by {@code AdjustIsrManager}, it uses: + * + *

    + *
  • Request deduplication: Multiple concurrent requests for the same resource are merged + *
  • Batch processing: All pending requests are sent together in one RPC call + *
  • Single in-flight request: Only one metadata request can be in-flight at a time + *
  • Asynchronous API: Provides CompletableFuture-based interface to avoid blocking + *
+ */ public class MetadataUpdater { private static final Logger LOG = LoggerFactory.getLogger(MetadataUpdater.class); private static final int MAX_RETRY_TIMES = 3; private static final int RETRY_INTERVAL_MS = 100; + /** Maximum retry times for metadata update requests before giving up. */ + private static final int MAX_METADATA_UPDATE_RETRIES = 10; + private final Configuration conf; private final RpcClient rpcClient; + private final ScheduledExecutorService scheduler; private final Set unavailableTabletServerIds = new CopyOnWriteArraySet<>(); - protected volatile Cluster cluster; + + /** The metadata fetcher, can be injected for testing. */ + private final ClusterMetadataFetcher metadataFetcher; + + /** The current cluster state, protected by read-write lock. */ + @GuardedBy("clusterRWLock") + protected Cluster cluster; + + /** Read-write lock for cluster access. */ + protected final ReadWriteLock clusterRWLock = new ReentrantReadWriteLock(); + + /** + * Pending metadata update requests. Maps each atomic resource to the pending request info. + * Multiple requests for the same resource share the same update. Visible for testing. + */ + @VisibleForTesting + protected final Map pendingRequests = + MapUtils.newConcurrentHashMap(); + + /** Used to allow only one in-flight request at a time. */ + private final AtomicBoolean inflightRequest = new AtomicBoolean(false); + + /** Flag to indicate if a retry is already scheduled to avoid duplicate scheduling. */ + private final AtomicBoolean retryScheduled = new AtomicBoolean(false); public MetadataUpdater(Configuration conf, RpcClient rpcClient) { - this(rpcClient, conf, initializeCluster(conf, rpcClient)); + this( + rpcClient, + conf, + initializeCluster(conf, rpcClient), + // Adapt MetadataUtils method reference to ClusterMetadataFetcher interface + (gateway, + partialUpdate, + originCluster, + tablePaths, + tablePartitions, + tablePartitionIds) -> + MetadataUtils.sendMetadataRequestAndRebuildClusterAsync( + gateway, + partialUpdate, + originCluster, + tablePaths, + tablePartitions, + tablePartitionIds)); } @VisibleForTesting public MetadataUpdater(RpcClient rpcClient, Configuration conf, Cluster cluster) { + this( + rpcClient, + conf, + cluster, + // Adapt MetadataUtils method reference to ClusterMetadataFetcher interface + (gateway, + partialUpdate, + originCluster, + tablePaths, + tablePartitions, + tablePartitionIds) -> + MetadataUtils.sendMetadataRequestAndRebuildClusterAsync( + gateway, + partialUpdate, + originCluster, + tablePaths, + tablePartitions, + tablePartitionIds)); + } + + @VisibleForTesting + public MetadataUpdater( + RpcClient rpcClient, + Configuration conf, + Cluster cluster, + ClusterMetadataFetcher metadataFetcher) { this.rpcClient = rpcClient; this.conf = conf; + this.metadataFetcher = metadataFetcher; + this.scheduler = + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r, "metadata-updater-scheduler"); + t.setDaemon(true); + return t; + }); this.cluster = cluster; } + // ========== Lifecycle Methods ========== + + /** + * Close the metadata updater and release resources. This will: 1. Shutdown the scheduler to + * prevent new retries 2. Cancel all pending requests 3. Clear internal state + */ + public void close() { + LOG.info("Closing MetadataUpdater"); + + // Shutdown scheduler immediately to prevent new tasks + scheduler.shutdownNow(); + + // Fail all pending requests + FlussRuntimeException closedException = + new FlussRuntimeException("MetadataUpdater is closed"); + for (Map.Entry entry : + pendingRequests.entrySet()) { + // Take snapshot and close to get all futures + List> allFutures = entry.getValue().snapshotAndClose(); + for (CompletableFuture future : allFutures) { + future.completeExceptionally(closedException); + } + } + + // Clear pending requests to free memory + pendingRequests.clear(); + unavailableTabletServerIds.clear(); + + LOG.info("MetadataUpdater closed"); + } + + // ========== Helper Methods ========== + + /** Await a CompletableFuture and convert exceptions appropriately for metadata operations. */ + private void awaitFuture(CompletableFuture future) { + try { + future.get(); + } catch (ExecutionException e) { + Throwable t = ExceptionUtils.stripExecutionException(e); + // Preserve PartitionNotExistException for callers to handle + if (t instanceof PartitionNotExistException) { + throw (PartitionNotExistException) t; + } + // For retriable exceptions, log warning but don't throw + else if (t instanceof RetriableException || t instanceof TimeoutException) { + LOG.warn("Failed to update metadata, but the exception is re-triable.", t); + } + // For other exceptions, wrap in FlussRuntimeException + else { + throw new FlussRuntimeException("Failed to update metadata", t); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new FlussRuntimeException("Interrupted while updating metadata", e); + } + } + + // ========== Cluster Metadata Access Methods ========== + public Cluster getCluster() { - return cluster; + return inReadLock(clusterRWLock, () -> cluster); } public @Nullable ServerNode getCoordinatorServer() { - return cluster.getCoordinatorServer(); + return getCluster().getCoordinatorServer(); + } + + public @Nullable ServerNode getRandomTabletServer() { + return getCluster().getRandomTabletServer(); } public Optional getPartitionId(PhysicalTablePath physicalTablePath) { - return cluster.getPartitionId(physicalTablePath); + return getCluster().getPartitionId(physicalTablePath); } public Long getPartitionIdOrElseThrow(PhysicalTablePath physicalTablePath) { - return cluster.getPartitionIdOrElseThrow(physicalTablePath); + return getCluster().getPartitionIdOrElseThrow(physicalTablePath); } public Optional getBucketLocation(TableBucket tableBucket) { - return cluster.getBucketLocation(tableBucket); + return getCluster().getBucketLocation(tableBucket); } public int leaderFor(TablePath tablePath, TableBucket tableBucket) { - Integer serverNode = cluster.leaderFor(tableBucket); + Integer serverNode = getCluster().leaderFor(tableBucket); if (serverNode == null) { for (int i = 0; i < MAX_RETRY_TIMES; i++) { // check if bucket is for a partition @@ -117,7 +285,7 @@ public int leaderFor(TablePath tablePath, TableBucket tableBucket) { } else { updateMetadata(Collections.singleton(tablePath), null, null); } - serverNode = cluster.leaderFor(tableBucket); + serverNode = getCluster().leaderFor(tableBucket); if (serverNode != null) { break; } @@ -135,14 +303,42 @@ public int leaderFor(TablePath tablePath, TableBucket tableBucket) { return serverNode; } - private @Nullable ServerNode getTabletServer(int id) { - return cluster.getTabletServer(id); - } + public Set getPhysicalTablePathByIds( + @Nullable Collection tableId, + @Nullable Collection tablePartitions) { + Set physicalTablePaths = new HashSet<>(); + Cluster currentCluster = getCluster(); - public @Nullable ServerNode getRandomTabletServer() { - return cluster.getRandomTabletServer(); + if (tableId != null) { + tableId.forEach( + id -> + currentCluster + .getTablePath(id) + .ifPresent( + p -> physicalTablePaths.add(PhysicalTablePath.of(p)))); + } + + if (tablePartitions != null) { + for (TablePartition tablePartition : tablePartitions) { + currentCluster + .getTablePath(tablePartition.getTableId()) + .ifPresent( + path -> { + Optional optPartition = + currentCluster.getPartitionName( + tablePartition.getPartitionId()); + optPartition.ifPresent( + p -> + physicalTablePaths.add( + PhysicalTablePath.of(path, p))); + }); + } + } + return physicalTablePaths; } + // ========== RPC Client Creation Methods ========== + public CoordinatorGateway newCoordinatorServerClient() { return GatewayClientProxy.createGatewayProxy( this::getCoordinatorServer, rpcClient, CoordinatorGateway.class); @@ -163,54 +359,44 @@ public TabletServerGateway newRandomTabletServerClient() { } } + private @Nullable ServerNode getTabletServer(int id) { + return getCluster().getTabletServer(id); + } + + // ========== Metadata Update Methods (Synchronous) ========== + + /** + * Check and update table metadata if needed. Only updates tables that are not present in cache. + */ public void checkAndUpdateTableMetadata(Set tablePaths) { - Set needUpdateTablePaths = - tablePaths.stream() - .filter(tablePath -> !cluster.getTableId(tablePath).isPresent()) - .collect(Collectors.toSet()); - if (!needUpdateTablePaths.isEmpty()) { - updateMetadata(needUpdateTablePaths, null, null); - } + awaitFuture(checkAndUpdateTableMetadataAsync(tablePaths)); } /** - * Check the partition exists in metadata cache, if not, try to update the metadata cache, if - * not exist yet, throw exception. + * Check and update partition metadata. Returns whether partition exists after update. * - *

and update partition metadata . + * @throws PartitionNotExistException if partition does not exist after update */ public boolean checkAndUpdatePartitionMetadata(PhysicalTablePath physicalTablePath) throws PartitionNotExistException { - if (!cluster.getPartitionId(physicalTablePath).isPresent()) { + Cluster currentCluster = getCluster(); + if (!currentCluster.getPartitionId(physicalTablePath).isPresent()) { updateMetadata(null, Collections.singleton(physicalTablePath), null); } - return cluster.getPartitionId(physicalTablePath).isPresent(); - } - - /** - * Check the table/partition bucket info for the given table bucket exist in metadata cache, if - * not, try to update the metadata cache. - */ - public void checkAndUpdateMetadata(TablePath tablePath, TableBucket tableBucket) { - if (tableBucket.getPartitionId() == null) { - checkAndUpdateTableMetadata(Collections.singleton(tablePath)); - } else { - checkAndUpdatePartitionMetadata( - tablePath, Collections.singleton(tableBucket.getPartitionId())); - } + return getCluster().getPartitionId(physicalTablePath).isPresent(); } /** - * Check the partitions info for the given partition ids exist in metadata cache, if not, try to - * update the metadata cache. + * Check and update partition metadata by partition IDs. * - *

Note: it'll assume the partition ids belong to the given {@code tablePath} + *

Note: Assumes the partition IDs belong to the given table. */ public void checkAndUpdatePartitionMetadata( TablePath tablePath, Collection partitionIds) { + Cluster currentCluster = getCluster(); Set needUpdatePartitionIds = new HashSet<>(); for (Long partitionId : partitionIds) { - if (!cluster.getPartitionName(partitionId).isPresent()) { + if (!currentCluster.getPartitionName(partitionId).isPresent()) { needUpdatePartitionIds.add(partitionId); } } @@ -220,13 +406,27 @@ public void checkAndUpdatePartitionMetadata( } } + /** + * Check and update metadata for a table bucket. Updates either table or partition metadata + * based on bucket type. + */ + public void checkAndUpdateMetadata(TablePath tablePath, TableBucket tableBucket) { + if (tableBucket.getPartitionId() == null) { + checkAndUpdateTableMetadata(Collections.singleton(tablePath)); + } else { + checkAndUpdatePartitionMetadata( + tablePath, Collections.singleton(tableBucket.getPartitionId())); + } + } + + /** Update table or partition metadata. Forces a metadata update regardless of cache state. */ public void updateTableOrPartitionMetadata(TablePath tablePath, @Nullable Long partitionId) { Collection partitionIds = partitionId == null ? null : Collections.singleton(partitionId); updateMetadata(Collections.singleton(tablePath), null, partitionIds); } - /** Update the table or partition metadata info. */ + /** Update physical table metadata. Separates tables and partitions and updates accordingly. */ public void updatePhysicalTableMetadata(Set physicalTablePaths) { Set updateTablePaths = new HashSet<>(); Set updatePartitionPath = new HashSet<>(); @@ -240,59 +440,601 @@ public void updatePhysicalTableMetadata(Set physicalTablePath updateMetadata(updateTablePaths, updatePartitionPath, null); } + /** + * Update metadata for tables, partitions, and partition IDs. This is the main synchronous + * update method. + */ @VisibleForTesting public void updateMetadata( @Nullable Set tablePaths, @Nullable Collection tablePartitionNames, @Nullable Collection tablePartitionIds) throws PartitionNotExistException { + awaitFuture(updateMetadataAsync(tablePaths, tablePartitionNames, tablePartitionIds)); + } + + /** Invalidate bucket metadata for given physical table paths. */ + public void invalidPhysicalTableBucketMeta(Set physicalTablesToInvalid) { + if (!physicalTablesToInvalid.isEmpty()) { + inWriteLock( + clusterRWLock, + () -> { + cluster = cluster.invalidPhysicalTableBucketMeta(physicalTablesToInvalid); + return null; + }); + } + } + + // ========== Metadata Update Methods (Asynchronous) ========== + + /** + * Async version of checkAndUpdateTableMetadata. Only updates tables that are not present in + * cache. + */ + private CompletableFuture checkAndUpdateTableMetadataAsync(Set tablePaths) { + Cluster currentCluster = getCluster(); + Set needUpdateTablePaths = + tablePaths.stream() + .filter(tablePath -> !currentCluster.getTableId(tablePath).isPresent()) + .collect(Collectors.toSet()); + if (needUpdateTablePaths.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + // Submit each table as atomic resource + List> futures = new ArrayList<>(); + for (TablePath tablePath : needUpdateTablePaths) { + futures.add(submitTableUpdateAsync(tablePath)); + } + + // Wait for all to complete + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + } + + /** + * Async version that checks and updates partition metadata, returning the partition ID. + * + *

Uses request batching and deduplication to avoid metadata request storms. + */ + public CompletableFuture checkAndUpdatePartitionMetadataAsync( + PhysicalTablePath physicalTablePath) { + // First check if partition already exists in current cluster (with read lock) + Cluster currentCluster = getCluster(); + if (currentCluster.getPartitionId(physicalTablePath).isPresent()) { + return CompletableFuture.completedFuture( + currentCluster.getPartitionIdOrElseThrow(physicalTablePath)); + } + + // Not in cache, need to fetch from server with batching + return submitPartitionUpdateAsync(physicalTablePath) + .thenApply( + v -> { + return inReadLock( + clusterRWLock, + () -> { + if (cluster.getPartitionId(physicalTablePath).isPresent()) { + return cluster.getPartitionIdOrElseThrow( + physicalTablePath); + } else { + throw new FlussRuntimeException( + new PartitionNotExistException( + "Partition does not exist after update: " + + physicalTablePath)); + } + }); + }); + } + + // ========== Internal Batching and Request Management ========== + + /** + * Generic method to submit a metadata update request for any resource type. + * + *

This method handles the common logic of: - Atomically adding the future to an existing or + * new request - Handling race conditions with closed requests - Triggering metadata update + * propagation + * + * @param key the metadata resource key + * @return a future that completes when the metadata is updated + */ + private CompletableFuture submitMetadataUpdateAsync(MetadataResourceKey key) { + CompletableFuture future = new CompletableFuture<>(); + + // Use compute() to atomically add future or create new request if needed + // This prevents race condition where multiple threads might overwrite each other's requests + pendingRequests.compute( + key, + (k, existingRequest) -> { + if (existingRequest == null || existingRequest.isClosed()) { + // No existing request or existing request is closed, create new one + PendingMetadataRequest newRequest = new PendingMetadataRequest(); + newRequest.addFuture(future); + return newRequest; + } else { + // Try to add to existing request + boolean added = existingRequest.addFuture(future); + if (added) { + // Successfully added to existing request + return existingRequest; + } else { + // Failed to add (request was closed concurrently), create new one + // This should rarely happen as we already checked isClosed() above + LOG.debug( + "Failed to add future to existing request for {}, creating new request", + key); + PendingMetadataRequest newRequest = new PendingMetadataRequest(); + newRequest.addFuture(future); + return newRequest; + } + } + }); + + maybePropagateMetadataUpdate(); + return future; + } + + /** + * Submit a request to update metadata for a single table. Returns a future that completes when + * the table metadata is updated. + */ + private CompletableFuture submitTableUpdateAsync(TablePath tablePath) { + return submitMetadataUpdateAsync(MetadataResourceKey.forTable(tablePath)); + } + + /** + * Submit a request to update metadata for a single partition. Returns a future that completes + * when the partition metadata is updated. + */ + private CompletableFuture submitPartitionUpdateAsync(PhysicalTablePath partition) { + return submitMetadataUpdateAsync(MetadataResourceKey.forPartition(partition)); + } + + /** + * Submit a request to update metadata for a single partition ID. Returns a future that + * completes when the partition metadata is updated. + */ + private CompletableFuture submitPartitionIdUpdateAsync(Long partitionId) { + return submitMetadataUpdateAsync(MetadataResourceKey.forPartitionId(partitionId)); + } + + /** + * Clean up exhausted pending requests to prevent memory leaks. + * + *

A request is cleaned up if it has exceeded the maximum retry count. + * + *

Cleaned up requests will have their futures completed exceptionally. + * + * @param snapshot snapshot of pending requests to check and cleanup + * @return keys that were cleaned up and should be removed from further processing + */ + private Set cleanupExhaustedRequests( + Map snapshot) { + Set keysToRemove = new HashSet<>(); + for (Map.Entry entry : snapshot.entrySet()) { + MetadataResourceKey key = entry.getKey(); + PendingMetadataRequest request = entry.getValue(); + + if (request.getRetryCount() >= MAX_METADATA_UPDATE_RETRIES) { + LOG.warn( + "Cleaning up pending metadata request for {}: exceeded maximum retries ({})", + key, + MAX_METADATA_UPDATE_RETRIES); + // Atomically remove from pending map and close to get all futures + pendingRequests.remove(key); + keysToRemove.add(key); + List> allFutures = request.snapshotAndClose(); + // Fail all futures for this resource + TimeoutException exception = + new TimeoutException( + String.format( + "Metadata update failed for %s: exceeded maximum retries (%d)", + key, MAX_METADATA_UPDATE_RETRIES)); + for (CompletableFuture future : allFutures) { + future.completeExceptionally(exception); + } + } + } + return keysToRemove; + } + + /** Trigger sending pending metadata requests if no request is currently in-flight. */ + private void maybePropagateMetadataUpdate() { + // Send all pending items if there is not already a request in-flight + if (!pendingRequests.isEmpty() && inflightRequest.compareAndSet(false, true)) { + try { + sendBatchedMetadataRequest(); + } catch (Throwable e) { + // Handle any top-level exceptions + LOG.error("Failed to send batched metadata request", e); + clearInFlightRequest(); + // Retry in near future, avoid duplicate scheduling + maybeRetry(); + } + } + } + + /** Schedule a retry if not already scheduled to avoid task accumulation. */ + private void maybeRetry() { + if (retryScheduled.compareAndSet(false, true)) { + try { + scheduler.schedule( + () -> { + retryScheduled.set(false); + maybePropagateMetadataUpdate(); + }, + 50, + TimeUnit.MILLISECONDS); + } catch (Exception e) { + // If scheduling fails (e.g., scheduler is shut down), reset the flag + // and log the error. Pending requests will be cleaned up by close() method. + retryScheduled.set(false); + LOG.error( + "Failed to schedule metadata update retry. " + + "Pending requests will be failed when MetadataUpdater is closed.", + e); + } + } + } + + /** Aggregate all pending requests and send a single batched metadata request. */ + @VisibleForTesting + protected void sendBatchedMetadataRequest() { + // Step 1: Take a snapshot of all pending requests (by reference) + // We intentionally don't close them yet to allow new futures to join + // This improves batching efficiency - futures added during RPC will also be completed + Map snapshot = new HashMap<>(); + for (Map.Entry entry : + pendingRequests.entrySet()) { + snapshot.put(entry.getKey(), entry.getValue()); + } + + if (snapshot.isEmpty()) { + clearInFlightRequest(); + return; + } + + // Wrap everything in try-catch to ensure futures are always completed + try { + // Clean up exhausted requests to prevent memory leaks + Set exhaustedKeys = cleanupExhaustedRequests(snapshot); + // Remove exhausted keys from snapshot + exhaustedKeys.forEach(snapshot::remove); + + // Re-check after cleanup + if (snapshot.isEmpty()) { + clearInFlightRequest(); + return; + } + + // Continue with normal processing... + sendBatchedMetadataRequestInternal(snapshot); + } catch (Throwable t) { + // Critical: if any unexpected exception occurs, fail all pending futures + // to prevent indefinite blocking + LOG.error( + "Unexpected error in sendBatchedMetadataRequest, failing all pending requests", + t); + failAllRequests(snapshot, t); + clearInFlightRequest(); + // Schedule retry for safety + maybeRetry(); + } + } + + /** + * Helper method to fail all requests in a snapshot with a given exception. Ensures all futures + * are completed to prevent memory leaks and blocking. + */ + private void failAllRequests( + Map snapshot, Throwable cause) { + for (Map.Entry entry : snapshot.entrySet()) { + // Atomically remove from pending map and close to get all futures + pendingRequests.remove(entry.getKey()); + List> allFutures = entry.getValue().snapshotAndClose(); + FlussRuntimeException exception = + new FlussRuntimeException( + "Metadata update failed due to unexpected error", cause); + for (CompletableFuture future : allFutures) { + future.completeExceptionally(exception); + } + } + } + + /** Internal method to send batched metadata request. Separated for better error handling. */ + private void sendBatchedMetadataRequestInternal( + Map snapshot) { + + // Aggregate all atomic resources by type + Set allTablePaths = new HashSet<>(); + Set allPartitions = new HashSet<>(); + Set allPartitionIds = new HashSet<>(); + + for (MetadataResourceKey key : snapshot.keySet()) { + switch (key.type) { + case TABLE: + allTablePaths.add(key.asTablePath()); + break; + case PARTITION: + allPartitions.add(key.asPhysicalTablePath()); + break; + case PARTITION_ID: + allPartitionIds.add(key.asPartitionId()); + break; + } + } + + LOG.debug( + "Sending batched metadata request: tables={}, partitions={}, partitionIds={}", + allTablePaths.size(), + allPartitions.size(), + allPartitionIds.size()); + + // Get gateway to an available tablet server + Cluster currentSnapshot = getCluster(); ServerNode serverNode = - getOneAvailableTabletServerNode(cluster, unavailableTabletServerIds); + MetadataUtils.getOneAvailableTabletServerNode( + currentSnapshot, unavailableTabletServerIds); + + if (serverNode == null) { + LOG.info( + "No available tablet server to update metadata, trying to re-initialize cluster"); + handleNoAvailableServer(snapshot); + return; + } + + // Create gateway and send request + AdminReadOnlyGateway gateway = + GatewayClientProxy.createGatewayProxy( + () -> serverNode, rpcClient, AdminReadOnlyGateway.class); + + // Use injected metadataFetcher to fetch metadata and rebuild cluster + metadataFetcher + .fetch( + gateway, + true, // partial update - merge with existing cluster + currentSnapshot, + allTablePaths.isEmpty() ? null : allTablePaths, + allPartitions.isEmpty() ? null : allPartitions, + allPartitionIds.isEmpty() ? null : allPartitionIds) + .whenComplete( + (newCluster, exception) -> { + try { + if (exception != null) { + // Mark this server as potentially unavailable + Throwable t = stripExecutionException(exception); + if (t instanceof RetriableException + || t instanceof TimeoutException) { + unavailableTabletServerIds.add(serverNode.id()); + LOG.warn( + "Marking tablet server {} as unavailable due to error", + serverNode.id(), + t); + } + handleMetadataRequestError(snapshot, exception); + // Retry in near future + maybeRetry(); + } else { + handleMetadataResponseWithNewCluster(newCluster, snapshot); + } + } finally { + clearInFlightRequest(); + } + // Try to send more pending requests if any + maybePropagateMetadataUpdate(); + }); + } + + /** + * Handle the case where no tablet server is available. + * + *

Important: This method must ensure that all futures in the snapshot are eventually + * completed, either by re-initializing and retrying, or by failing them explicitly. + */ + private void handleNoAvailableServer( + Map snapshot) { try { - synchronized (this) { - if (serverNode == null) { - LOG.info( - "No available tablet server to update metadata, try to re-initialize cluster using bootstrap server."); - cluster = initializeCluster(conf, rpcClient); - } else { - cluster = - sendMetadataRequestAndRebuildCluster( - cluster, - rpcClient, - tablePaths, - tablePartitionNames, - tablePartitionIds, - serverNode); - } + // Try to re-initialize cluster from bootstrap servers + Cluster newCluster = initializeCluster(conf, rpcClient); + inWriteLock( + clusterRWLock, + () -> { + cluster = newCluster; + return null; + }); + // Clear unavailable server set since we have fresh cluster info + unavailableTabletServerIds.clear(); + + // Successfully re-initialized cluster, retry without incrementing retry count + // The requests in snapshot are still in pendingRequests and will be processed + // Note: We don't remove requests from pendingRequests or close them + // They will be retried by the next maybePropagateMetadataUpdate() call + clearInFlightRequest(); + maybePropagateMetadataUpdate(); + } catch (Exception e) { + // Re-initialization failed + // Increment retry count for all requests + for (PendingMetadataRequest request : snapshot.values()) { + request.incrementAndGetRetryCount(); } + // handleMetadataRequestError will complete futures that exceeded max retries + // or keep them for retry if still under limit + handleMetadataRequestError(snapshot, e); + clearInFlightRequest(); + // Schedule retry - this ensures pending requests will be retried + maybeRetry(); + } + } + + /** Handle successful metadata response with the new cluster built by MetadataUtils. */ + private void handleMetadataResponseWithNewCluster( + Cluster newCluster, Map snapshot) { + try { + // Update cluster with write lock (single writer) + inWriteLock( + clusterRWLock, + () -> { + cluster = newCluster; + return null; + }); + + // Complete all futures successfully + // Important: We only remove and close the request if it's still the same object + // in pendingRequests. This prevents completing futures from a newer request. + for (Map.Entry entry : + snapshot.entrySet()) { + MetadataResourceKey key = entry.getKey(); + PendingMetadataRequest request = entry.getValue(); - Map aliveTabletServers = cluster.getAliveTabletServers(); - unavailableTabletServerIds.removeIf(aliveTabletServers::containsKey); - if (!unavailableTabletServerIds.isEmpty()) { - LOG.info( - "After update metadata, unavailable tabletServer set: {}", - unavailableTabletServerIds); + // Atomically remove and close the request + List> allFutures = + removeAndClosePendingRequest(key, request); + // Complete all futures for this resource (including ones added during RPC) + // If list is empty, it means the request was replaced by a new one + for (CompletableFuture future : allFutures) { + future.complete(null); + } } + + // Clear unavailable servers as we successfully communicated + Map aliveServers = getCluster().getAliveTabletServers(); + unavailableTabletServerIds.removeIf(aliveServers::containsKey); + } catch (Exception e) { - Throwable t = stripExecutionException(e); - if (t instanceof RetriableException || t instanceof TimeoutException) { - if (serverNode != null) { - unavailableTabletServerIds.add(serverNode.id()); + LOG.error("Failed to process metadata response", e); + // Increment retry count for all requests + for (PendingMetadataRequest request : snapshot.values()) { + request.incrementAndGetRetryCount(); + } + handleMetadataRequestError(snapshot, e); + } + } + + /** Handle metadata request error. */ + private void handleMetadataRequestError( + Map snapshot, Throwable error) { + LOG.warn("Metadata request failed", error); + + Throwable cause = stripExecutionException(error); + + // For non-retriable errors, fail the requests immediately + if (!(cause instanceof RetriableException) && !(cause instanceof TimeoutException)) { + for (Map.Entry entry : + snapshot.entrySet()) { + MetadataResourceKey key = entry.getKey(); + PendingMetadataRequest request = entry.getValue(); + + // Atomically remove and close the request + List> allFutures = + removeAndClosePendingRequest(key, request); + // Fail all futures for this resource + for (CompletableFuture future : allFutures) { + // Pass the original exception without wrapping to preserve exception type + // (e.g., PartitionNotExistException needs to be caught by callers) + future.completeExceptionally(cause); + } + } + } else { + // For retriable errors, check retry count and decide + for (Map.Entry entry : + snapshot.entrySet()) { + MetadataResourceKey key = entry.getKey(); + PendingMetadataRequest request = entry.getValue(); + int retryCount = request.getRetryCount(); + + // Check if exceeded max retries + if (retryCount >= MAX_METADATA_UPDATE_RETRIES) { LOG.warn( - "tabletServer {} is unavailable for updating metadata for retriable exception. unavailable tabletServer set {}", - serverNode, - unavailableTabletServerIds); + "Metadata request for {} exceeded max retries ({}), failing request", + key, + MAX_METADATA_UPDATE_RETRIES); + // Atomically remove and close the request + List> allFutures = + removeAndClosePendingRequest(key, request); + // Fail all futures + TimeoutException timeoutException = + new TimeoutException( + String.format( + "Metadata update failed after %d retries for %s", + MAX_METADATA_UPDATE_RETRIES, key)); + for (CompletableFuture future : allFutures) { + future.completeExceptionally(timeoutException); + } } - LOG.warn("Failed to update metadata, but the exception is re-triable.", t); - } else if (t instanceof PartitionNotExistException) { - LOG.warn("Failed to update metadata because the partition does not exist", t); - throw (PartitionNotExistException) t; - } else { - throw new FlussRuntimeException("Failed to update metadata", t); } + // Keep remaining items in the map for retry + // They will be retried by the scheduled task + } + } + + /** Clear the in-flight request flag. */ + @VisibleForTesting + protected void clearInFlightRequest() { + if (!inflightRequest.compareAndSet(true, false)) { + LOG.warn("Attempting to clear in-flight flag when no request is in-flight"); + } + } + + /** + * Atomically remove a pending request and close it to get all futures. + * + *

This helper method encapsulates the common pattern of: 1. Checking if the request is still + * the same object in the map 2. Removing it atomically 3. Closing it to get all futures + * + * @return all futures from the request, or empty list if the request was already replaced + */ + private List> removeAndClosePendingRequest( + MetadataResourceKey key, PendingMetadataRequest request) { + boolean removed = pendingRequests.remove(key, request); + if (removed) { + return request.snapshotAndClose(); + } + return Collections.emptyList(); + } + + /** + * Async version that updates metadata for tables, partitions, and partition IDs. + * + *

This is the core async implementation that uses request batching and deduplication to + * avoid metadata request storms. + */ + public CompletableFuture updateMetadataAsync( + @Nullable Set tablePaths, + @Nullable Collection tablePartitionNames, + @Nullable Collection tablePartitionIds) { + // Collect all futures for atomic resources + List> futures = new ArrayList<>(); + + // Submit each table as atomic resource + if (tablePaths != null) { + for (TablePath tablePath : tablePaths) { + futures.add(submitTableUpdateAsync(tablePath)); + } + } + + // Submit each partition as atomic resource + if (tablePartitionNames != null) { + for (PhysicalTablePath partition : tablePartitionNames) { + futures.add(submitPartitionUpdateAsync(partition)); + } + } + + // Submit each partition ID as atomic resource + if (tablePartitionIds != null) { + for (Long partitionId : tablePartitionIds) { + futures.add(submitPartitionIdUpdateAsync(partitionId)); + } + } + + if (futures.isEmpty()) { + return CompletableFuture.completedFuture(null); } + + // Wait for all to complete + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); } + // ========== Cluster Initialization Methods ========== + /** * Initialize Cluster. This step just to get the coordinator server address and alive tablet * servers according to the config {@link ConfigOptions#BOOTSTRAP_SERVERS}. @@ -403,41 +1145,149 @@ private static Cluster tryToInitializeCluster(AdminReadOnlyGateway adminReadOnly return sendMetadataRequestAndRebuildCluster(adminReadOnlyGateway, Collections.emptySet()); } - /** Invalid the bucket metadata for the given physical table paths. */ - public void invalidPhysicalTableBucketMeta(Set physicalTablesToInvalid) { - if (!physicalTablesToInvalid.isEmpty()) { - cluster = cluster.invalidPhysicalTableBucketMeta(physicalTablesToInvalid); + /** + * Tracks metadata update request with retry information to prevent memory leaks. + * + *

Thread-safety: Uses ReadWriteLock to allow concurrent reads while ensuring exclusive + * writes. This is beneficial because: + * + *

    + *
  • Read operations (isClosed, getRetryCount) may be called frequently by multiple threads + *
  • Write operations (addFuture, incrementAndGetRetryCount) need exclusive access + *
  • ReadWriteLock allows multiple concurrent readers, improving throughput in high + * concurrency scenarios + *
+ */ + @VisibleForTesting + protected static class PendingMetadataRequest { + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final List> futures = new ArrayList<>(); + private int retryCount = 0; + private boolean closed = false; + + /** + * Add a future to this request. Returns false if the request is already closed. + * + *

Uses write lock to ensure atomicity with snapshotAndClose. + */ + boolean addFuture(CompletableFuture future) { + return inWriteLock( + lock, + () -> { + if (closed) { + return false; + } + futures.add(future); + return true; + }); + } + + /** + * Take a snapshot of all futures and mark this request as closed. + * + *

This operation atomically closes the request and gets all futures. After this call, no + * new futures can be added. + * + * @return all futures that were added before closing + */ + List> snapshotAndClose() { + return inWriteLock( + lock, + () -> { + closed = true; + return new ArrayList<>(futures); + }); + } + + /** Increment and get retry count. Uses write lock as it modifies state. */ + int incrementAndGetRetryCount() { + return inWriteLock(lock, () -> ++retryCount); + } + + /** + * Get current retry count. Uses read lock to allow concurrent access with other readers. + */ + int getRetryCount() { + return inReadLock(lock, () -> retryCount); + } + + /** + * Check if closed. Uses read lock to allow concurrent access with other readers. + * + *

This is particularly important as this method may be called frequently in {@link + * #submitMetadataUpdateAsync} by multiple threads checking if they can add futures to an + * existing request. + */ + boolean isClosed() { + return inReadLock(lock, () -> closed); } } - /** Get the table physical paths by table ids and partition ids. */ - public Set getPhysicalTablePathByIds( - @Nullable Collection tableId, - @Nullable Collection tablePartitions) { - Set physicalTablePaths = new HashSet<>(); - if (tableId != null) { - tableId.forEach( - id -> - cluster.getTablePath(id) - .ifPresent( - p -> physicalTablePaths.add(PhysicalTablePath.of(p)))); + /** + * Key to identify and deduplicate atomic metadata resources. Each key represents exactly one + * resource (table, partition, or partition ID). + */ + @VisibleForTesting + protected static class MetadataResourceKey { + protected final ResourceType type; + protected final Object resource; + + /** Types of atomic metadata resources. */ + enum ResourceType { + TABLE, // Single table + PARTITION, // Single partition + PARTITION_ID // Single partition ID } - if (tablePartitions != null) { - for (TablePartition tablePartition : tablePartitions) { - cluster.getTablePath(tablePartition.getTableId()) - .ifPresent( - path -> { - Optional optPartition = - cluster.getPartitionName( - tablePartition.getPartitionId()); - optPartition.ifPresent( - p -> - physicalTablePaths.add( - PhysicalTablePath.of(path, p))); - }); + private MetadataResourceKey(ResourceType type, Object resource) { + this.type = type; + this.resource = resource; + } + + static MetadataResourceKey forTable(TablePath tablePath) { + return new MetadataResourceKey(ResourceType.TABLE, tablePath); + } + + static MetadataResourceKey forPartition(PhysicalTablePath partition) { + return new MetadataResourceKey(ResourceType.PARTITION, partition); + } + + static MetadataResourceKey forPartitionId(Long partitionId) { + return new MetadataResourceKey(ResourceType.PARTITION_ID, partitionId); + } + + TablePath asTablePath() { + return (TablePath) resource; + } + + PhysicalTablePath asPhysicalTablePath() { + return (PhysicalTablePath) resource; + } + + Long asPartitionId() { + return (Long) resource; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; } + MetadataResourceKey that = (MetadataResourceKey) o; + return type == that.type && resource.equals(that.resource); + } + + @Override + public int hashCode() { + return 31 * type.hashCode() + resource.hashCode(); + } + + @Override + public String toString() { + return type + ":" + resource; } - return physicalTablePaths; } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientUtils.java index 8629c6f95a..e1b801a961 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientUtils.java @@ -32,6 +32,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -132,4 +133,23 @@ public static Long getPartitionId( metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath); return metadataUpdater.getCluster().getPartitionIdOrElseThrow(physicalTablePath); } + + /** + * Async version of getPartitionId that does not block the calling thread. This is used to avoid + * deadlock when called from Netty IO threads. + * + *

Return the id of the partition the row belongs to. It'll try to update the metadata if the + * partition doesn't exist. If the partition doesn't exist yet after update metadata, the + * returned future will be completed exceptionally with {@link PartitionNotExistException}. + */ + public static CompletableFuture getPartitionIdAsync( + InternalRow row, + PartitionGetter partitionGetter, + TablePath tablePath, + MetadataUpdater metadataUpdater) { + checkNotNull(partitionGetter, "partitionGetter shouldn't be null."); + String partitionName = partitionGetter.getPartition(row); + PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName); + return metadataUpdater.checkAndUpdatePartitionMetadataAsync(physicalTablePath); + } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java index 2990054999..21a2b4d203 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java @@ -21,6 +21,7 @@ import org.apache.fluss.cluster.Cluster; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; +import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.exception.StaleMetadataException; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; @@ -46,8 +47,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -91,6 +94,27 @@ public static Cluster sendMetadataRequestAndRebuildCluster( gateway, true, cluster, tablePaths, tablePartitionNames, tablePartitionIds); } + /** + * Async version of sendMetadataRequestAndRebuildCluster. This is used to avoid deadlock when + * called from Netty IO threads. + */ + public static CompletableFuture sendMetadataRequestAndRebuildClusterAsync( + Cluster cluster, + RpcClient client, + @Nullable Set tablePaths, + @Nullable Collection tablePartitionNames, + @Nullable Collection tablePartitionIds) { + AdminReadOnlyGateway gateway = + GatewayClientProxy.createGatewayProxy( + () -> + getOneAvailableTabletServerNode( + cluster, java.util.Collections.emptySet()), + client, + AdminReadOnlyGateway.class); + return sendMetadataRequestAndRebuildClusterAsync( + gateway, true, cluster, tablePaths, tablePartitionNames, tablePartitionIds); + } + /** maybe partial update cluster. */ public static Cluster sendMetadataRequestAndRebuildCluster( AdminReadOnlyGateway gateway, @@ -100,6 +124,24 @@ public static Cluster sendMetadataRequestAndRebuildCluster( @Nullable Collection tablePartitions, @Nullable Collection tablePartitionIds) throws ExecutionException, InterruptedException, TimeoutException { + return sendMetadataRequestAndRebuildClusterAsync( + gateway, + partialUpdate, + originCluster, + tablePaths, + tablePartitions, + tablePartitionIds) + .get(30, TimeUnit.SECONDS); + } + + /** Async version of sendMetadataRequestAndRebuildCluster. */ + public static CompletableFuture sendMetadataRequestAndRebuildClusterAsync( + AdminReadOnlyGateway gateway, + boolean partialUpdate, + Cluster originCluster, + @Nullable Set tablePaths, + @Nullable Collection tablePartitions, + @Nullable Collection tablePartitionIds) { MetadataRequest metadataRequest = ClientRpcMessageUtils.makeMetadataRequest( tablePaths, tablePartitions, tablePartitionIds); @@ -153,10 +195,7 @@ public static Cluster sendMetadataRequestAndRebuildCluster( newBucketLocations, newTablePathToTableId, newPartitionIdByPath); - }) - .get(30, TimeUnit.SECONDS); // TODO currently, we don't have timeout logic in - // RpcClient, it will let the get() block forever. So we - // time out here + }); } private static NewTableMetadata getTableMetadataToUpdate( @@ -194,8 +233,35 @@ private static NewTableMetadata getTableMetadataToUpdate( pbPartitionMetadataList.forEach( pbPartitionMetadata -> { long tableId = pbPartitionMetadata.getTableId(); - // the table path should be initialized at begin - TablePath tablePath = cluster.getTablePathOrElseThrow(tableId); + // Get table path from new metadata first, if not found, get from cluster cache. + // This handles the case where partition metadata is returned but corresponding + // table metadata is not included in the response (e.g., when only requesting + // partition update). The cluster parameter now always contains the latest state + // (after fixing lost update issue from commit 1534e17c), but we still check + // response first for completeness. + TablePath tablePath = null; + for (Map.Entry entry : newTablePathToTableId.entrySet()) { + if (entry.getValue().equals(tableId)) { + tablePath = entry.getKey(); + break; + } + } + + if (tablePath == null) { + // Table metadata not in response, try to get from cluster cache + Optional tablePathOpt = cluster.getTablePath(tableId); + if (!tablePathOpt.isPresent()) { + // If table is not found in both response and cache, the partition + // metadata cannot be processed. Throw exception to indicate the issue. + throw new PartitionNotExistException( + String.format( + "%s(p=%s) not found in cluster.", + "[table with id " + tableId + "]", + pbPartitionMetadata.getPartitionName())); + } + tablePath = tablePathOpt.get(); + } + PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, pbPartitionMetadata.getPartitionName()); newPartitionIdByPath.put( diff --git a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java index 6717cfe5ae..cab44894ae 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java @@ -21,18 +21,31 @@ import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.StaleMetadataException; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.TablePath; import org.apache.fluss.rpc.RpcClient; import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; import org.apache.fluss.rpc.metrics.TestingClientMetricGroup; import org.apache.fluss.server.coordinator.TestCoordinatorGateway; +import org.apache.fluss.utils.concurrent.FutureUtils; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import javax.annotation.Nullable; + +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.buildMetadataResponse; import static org.assertj.core.api.Assertions.assertThat; @@ -46,6 +59,15 @@ public class MetadataUpdaterTest { private static final ServerNode TS_NODE = new ServerNode(1, "localhost", 8080, ServerType.TABLET_SERVER); + private MetadataUpdater metadataUpdater; + + @AfterEach + void tearDown() { + if (metadataUpdater != null) { + metadataUpdater.close(); + } + } + @Test void testInitializeClusterWithRetries() throws Exception { Configuration configuration = new Configuration(); @@ -70,6 +92,260 @@ void testInitializeClusterWithRetries() throws Exception { .hasMessageContaining("The metadata is stale."); } + @Test + void testRequestDeduplication() throws Exception { + // Test that duplicate requests for the same resource are deduplicated + Configuration conf = new Configuration(); + RpcClient rpcClient = RpcClient.create(conf, TestingClientMetricGroup.newInstance(), false); + TestClusterMetadataFetcher testFetcher = new TestClusterMetadataFetcher(); + + Cluster initialCluster = + new Cluster( + Collections.singletonMap(1, TS_NODE), + CS_NODE, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap()); + + metadataUpdater = new MetadataUpdater(rpcClient, conf, initialCluster, testFetcher); + + TablePath table = TablePath.of("db", "table"); + Set tables = Collections.singleton(table); + + // Submit the same table request multiple times concurrently + CountDownLatch latch = new CountDownLatch(3); + for (int i = 0; i < 3; i++) { + new Thread( + () -> { + try { + metadataUpdater.checkAndUpdateTableMetadata(tables); + } finally { + latch.countDown(); + } + }) + .start(); + } + + // Wait for all threads to complete + assertThat(latch.await(5, TimeUnit.SECONDS)).as("All requests should complete").isTrue(); + + // CRITICAL: Duplicate requests for the SAME table MUST be deduplicated into exactly 1 + // fetch + assertThat(testFetcher.getFetchCount()) + .as("Duplicate requests for the same table MUST be deduplicated into one fetch") + .isEqualTo(1); + + // Verify the correct table was requested + Set requestedTables = testFetcher.getLastRequestedTables(); + assertThat(requestedTables).containsExactly(table); + + // Verify no resource leaks + assertThat(metadataUpdater.pendingRequests) + .as("All pending requests should be cleaned up after completion") + .isEmpty(); + } + + @Test + void testRequestBatching() throws Exception { + // Test that multiple concurrent requests for DIFFERENT tables are batched into one fetch + Configuration conf = new Configuration(); + RpcClient rpcClient = RpcClient.create(conf, TestingClientMetricGroup.newInstance(), false); + TestClusterMetadataFetcher testFetcher = new TestClusterMetadataFetcher(); + + Cluster initialCluster = + new Cluster( + Collections.singletonMap(1, TS_NODE), + CS_NODE, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap()); + + metadataUpdater = new MetadataUpdater(rpcClient, conf, initialCluster, testFetcher); + + TablePath table1 = TablePath.of("db", "table1"); + TablePath table2 = TablePath.of("db", "table2"); + TablePath table3 = TablePath.of("db", "table3"); + + // Use CountDownLatch to ensure threads start as close together as possible + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch readyLatch = new CountDownLatch(3); + CountDownLatch completionLatch = new CountDownLatch(3); + + Thread t1 = + new Thread( + () -> { + try { + readyLatch.countDown(); + startLatch.await(); // Wait for all threads to be ready + metadataUpdater.checkAndUpdateTableMetadata( + Collections.singleton(table1)); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + completionLatch.countDown(); + } + }); + + Thread t2 = + new Thread( + () -> { + try { + readyLatch.countDown(); + startLatch.await(); + metadataUpdater.checkAndUpdateTableMetadata( + Collections.singleton(table2)); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + completionLatch.countDown(); + } + }); + + Thread t3 = + new Thread( + () -> { + try { + readyLatch.countDown(); + startLatch.await(); + metadataUpdater.checkAndUpdateTableMetadata( + Collections.singleton(table3)); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + completionLatch.countDown(); + } + }); + + t1.start(); + t2.start(); + t3.start(); + + // Wait for all threads to be ready + assertThat(readyLatch.await(2, TimeUnit.SECONDS)) + .as("All threads should be ready") + .isTrue(); + + // Release all threads at once to maximize concurrency + startLatch.countDown(); + + // Wait for all to complete + assertThat(completionLatch.await(5, TimeUnit.SECONDS)) + .as("All requests should complete within timeout") + .isTrue(); + + // CRITICAL: Expected batching behavior depends on thread scheduling: + // Best case: All 3 requests arrive before first fetch starts -> 1 fetch with all 3 tables + // Common case: First request triggers fetch, other 2 queue -> 2 fetches + // Worst case: Each request arrives after previous fetch completes -> 3 fetches + // The key is that batching REDUCES fetches from the theoretical maximum of 3 + assertThat(testFetcher.getFetchCount()) + .as( + "Batching should reduce fetch count. " + + "With 3 concurrent requests, expect 1-3 fetches depending on timing") + .isGreaterThanOrEqualTo(1) + .isLessThanOrEqualTo(3); + + // Verify all three tables were requested (across all fetches) + Set allRequestedTables = testFetcher.getAllRequestedTables(); + assertThat(allRequestedTables) + .as("All three tables should be requested across all batched fetches") + .containsExactlyInAnyOrder(table1, table2, table3); + + // Verify no resource leaks + assertThat(metadataUpdater.pendingRequests).isEmpty(); + } + + @Test + void testRetryAndCleanup() throws Exception { + // Test that failed requests trigger retries and are eventually cleaned up + Configuration conf = new Configuration(); + RpcClient rpcClient = RpcClient.create(conf, TestingClientMetricGroup.newInstance(), false); + TestClusterMetadataFetcher testFetcher = new TestClusterMetadataFetcher(); + testFetcher.setShouldFail(true); + + Cluster initialCluster = + new Cluster( + Collections.singletonMap(1, TS_NODE), + CS_NODE, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap()); + + metadataUpdater = new MetadataUpdater(rpcClient, conf, initialCluster, testFetcher); + + TablePath table = TablePath.of("db", "table"); + Set tables = Collections.singleton(table); + + // Submit request that will fail - expect FlussRuntimeException wrapping the failure + assertThatThrownBy(() -> metadataUpdater.checkAndUpdateTableMetadata(tables)) + .as( + "After max retries, request should fail with FlussRuntimeException indicating update failure") + .isInstanceOf(FlussRuntimeException.class) + .hasMessageContaining("Failed to update metadata"); + + // CRITICAL: Verify that fetch was attempted + // Note: With continuous failures, the system may not retry the actual fetch + // but will increment retry count internally until max retries is reached + assertThat(testFetcher.getFetchCount()) + .as("At least one fetch attempt should be made before giving up") + .isGreaterThanOrEqualTo(1); + + // CRITICAL: Verify no resource leaks after failure and cleanup + assertThat(metadataUpdater.pendingRequests) + .as( + "After max retries and failure, pending requests MUST be cleaned up to prevent leaks") + .isEmpty(); + } + + @Test + void testResourceCleanupOnSuccess() throws Exception { + // Test that resources (pending requests and futures) are properly cleaned up on success + Configuration conf = new Configuration(); + RpcClient rpcClient = RpcClient.create(conf, TestingClientMetricGroup.newInstance(), false); + TestClusterMetadataFetcher testFetcher = new TestClusterMetadataFetcher(); + + Cluster initialCluster = + new Cluster( + Collections.singletonMap(1, TS_NODE), + CS_NODE, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap()); + + metadataUpdater = new MetadataUpdater(rpcClient, conf, initialCluster, testFetcher); + + // Submit multiple sequential requests + for (int i = 0; i < 5; i++) { + TablePath table = TablePath.of("db", "table" + i); + metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(table)); + + // CRITICAL: After EACH request completes, pending map should be empty + assertThat(metadataUpdater.pendingRequests) + .as("Pending requests must be cleaned up after each successful request") + .isEmpty(); + } + + // Verify requests were processed + assertThat(testFetcher.getFetchCount()) + .as("All 5 requests should have been processed") + .isEqualTo(5); + + // Verify updater is still functional after multiple operations + TablePath newTable = TablePath.of("db", "new_table"); + int previousCount = testFetcher.getFetchCount(); + + metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(newTable)); + + assertThat(testFetcher.getFetchCount()) + .as("MetadataUpdater should remain functional after cleanup") + .isEqualTo(previousCount + 1); + + // Final cleanup verification + assertThat(metadataUpdater.pendingRequests) + .as("Final cleanup verification: no pending requests") + .isEmpty(); + } + private static final class TestingAdminReadOnlyGateway extends TestCoordinatorGateway { private final int maxRetryCount; @@ -95,4 +371,59 @@ public CompletableFuture metadata(MetadataRequest request) { } } } + + /** + * Test implementation of ClusterMetadataFetcher for unit testing. Simulates metadata fetching + * with configurable behavior. + */ + private static class TestClusterMetadataFetcher implements ClusterMetadataFetcher { + private final AtomicInteger fetchCount = new AtomicInteger(0); + private volatile Set lastRequestedTables; + private final Set allRequestedTables = new HashSet<>(); + private volatile boolean shouldFail = false; + + @Override + public CompletableFuture fetch( + AdminReadOnlyGateway gateway, + boolean partialUpdate, + Cluster originCluster, + @Nullable Set tablePaths, + @Nullable Collection tablePartitions, + @Nullable Collection tablePartitionIds) { + + fetchCount.incrementAndGet(); + if (tablePaths != null) { + lastRequestedTables = new HashSet<>(tablePaths); + synchronized (allRequestedTables) { + allRequestedTables.addAll(tablePaths); + } + } + + if (shouldFail) { + return FutureUtils.failedCompletableFuture( + new RuntimeException("Simulated fetch failure")); + } + + // Return the origin cluster unchanged (successful fetch) + return CompletableFuture.completedFuture(originCluster); + } + + public int getFetchCount() { + return fetchCount.get(); + } + + public Set getLastRequestedTables() { + return lastRequestedTables; + } + + public Set getAllRequestedTables() { + synchronized (allRequestedTables) { + return new HashSet<>(allRequestedTables); + } + } + + public void setShouldFail(boolean shouldFail) { + this.shouldFail = shouldFail; + } + } } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java index 2063951357..c8611ce106 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java @@ -40,6 +40,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; + /** Testing class for metadata updater. */ public class TestingMetadataUpdater extends MetadataUpdater { public static final ServerNode COORDINATOR = @@ -72,7 +74,15 @@ public TestingMetadataUpdater( super( RpcClient.create(conf, TestingClientMetricGroup.newInstance(), false), conf, - Cluster.empty()); + Cluster.empty(), + // Provide a no-op metadata fetcher for testing + (gateway, + partialUpdate, + originCluster, + tablePaths, + tablePartitions, + tablePartitionIds) -> + java.util.concurrent.CompletableFuture.completedFuture(originCluster)); initializeCluster(coordinatorServer, tabletServers, tableInfos); coordinatorGateway = new TestCoordinatorGateway(); if (customGateways != null) { @@ -133,15 +143,26 @@ public TestingMetadataUpdater build() { } } - public void updateCluster(Cluster cluster) { - this.cluster = cluster; + public void updateCluster(Cluster newCluster) { + setCluster(newCluster); + } + + /** Helper method to set cluster for testing. */ + private void setCluster(Cluster newCluster) { + inWriteLock( + clusterRWLock, + () -> { + super.cluster = newCluster; + return null; + }); } @Override public void checkAndUpdateTableMetadata(Set tablePaths) { + Cluster currentCluster = getCluster(); Set needUpdateTablePaths = tablePaths.stream() - .filter(tablePath -> !cluster.getTableId(tablePath).isPresent()) + .filter(tablePath -> !currentCluster.getTableId(tablePath).isPresent()) .collect(Collectors.toSet()); if (!needUpdateTablePaths.isEmpty()) { throw new IllegalStateException( @@ -163,7 +184,7 @@ public TabletServerGateway newRandomTabletServerClient() { @Override public TabletServerGateway newTabletServerClientForNode(int serverId) { - if (cluster.getTabletServer(serverId) == null) { + if (getCluster().getTabletServer(serverId) == null) { return null; } else { return tabletServerGatewayMap.get(serverId); @@ -212,12 +233,12 @@ private void initializeCluster( replicas))); tableIdByPath.put(tablePath, tableId); }); - cluster = + setCluster( new Cluster( tabletServerMap, coordinatorServer, tablePathToBucketLocations, tableIdByPath, - Collections.emptyMap()); + Collections.emptyMap())); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/concurrent/FutureUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/concurrent/FutureUtils.java index d13862cf14..67d9c9b5d4 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/concurrent/FutureUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/concurrent/FutureUtils.java @@ -90,6 +90,12 @@ public static void completeFromCallable( } } + public static CompletableFuture failedCompletableFuture(Throwable cause) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(cause); + return future; + } + /** Runnable to complete the given future with a {@link TimeoutException}. */ private static final class Timeout implements Runnable {