4141import org .apache .kafka .common .utils .BufferSupplier ;
4242import org .apache .kafka .common .utils .ChildFirstClassLoader ;
4343import org .apache .kafka .common .utils .CloseableIterator ;
44- import org .apache .kafka .common .utils .KafkaThread ;
4544import org .apache .kafka .common .utils .LogContext ;
45+ import org .apache .kafka .common .utils .ThreadUtils ;
4646import org .apache .kafka .common .utils .Time ;
4747import org .apache .kafka .common .utils .Utils ;
4848import org .apache .kafka .server .common .CheckpointFile ;
130130import java .util .concurrent .ScheduledThreadPoolExecutor ;
131131import java .util .concurrent .ThreadFactory ;
132132import java .util .concurrent .TimeUnit ;
133- import java .util .concurrent .atomic .AtomicInteger ;
134133import java .util .concurrent .locks .Condition ;
135134import java .util .concurrent .locks .ReentrantLock ;
136135import java .util .function .BiConsumer ;
162161public class RemoteLogManager implements Closeable {
163162
164163 private static final Logger LOGGER = LoggerFactory .getLogger (RemoteLogManager .class );
165- private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader" ;
164+ private static final String REMOTE_LOG_READER_THREAD_NAME_PATTERN = "remote-log-reader-%d " ;
166165 private final RemoteLogManagerConfig rlmConfig ;
167166 private final int brokerId ;
168167 private final String logDir ;
@@ -255,18 +254,18 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
255254 indexCache = new RemoteIndexCache (rlmConfig .remoteLogIndexFileCacheTotalSizeBytes (), remoteLogStorageManager , logDir );
256255 delayInMs = rlmConfig .remoteLogManagerTaskIntervalMs ();
257256 rlmCopyThreadPool = new RLMScheduledThreadPool (rlmConfig .remoteLogManagerCopierThreadPoolSize (),
258- "RLMCopyThreadPool" , "kafka-rlm-copy-thread-pool-" );
257+ "RLMCopyThreadPool" , "kafka-rlm-copy-thread-pool-%d " );
259258 rlmExpirationThreadPool = new RLMScheduledThreadPool (rlmConfig .remoteLogManagerExpirationThreadPoolSize (),
260- "RLMExpirationThreadPool" , "kafka-rlm-expiration-thread-pool-" );
259+ "RLMExpirationThreadPool" , "kafka-rlm-expiration-thread-pool-%d " );
261260 followerThreadPool = new RLMScheduledThreadPool (rlmConfig .remoteLogManagerThreadPoolSize (),
262- "RLMFollowerScheduledThreadPool" , "kafka-rlm-follower-thread-pool-" );
261+ "RLMFollowerScheduledThreadPool" , "kafka-rlm-follower-thread-pool-%d " );
263262
264263 metricsGroup .newGauge (REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC , rlmCopyThreadPool ::getIdlePercent );
265264 remoteReadTimer = metricsGroup .newTimer (REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC ,
266265 TimeUnit .MILLISECONDS , TimeUnit .SECONDS );
267266
268267 remoteStorageReaderThreadPool = new RemoteStorageThreadPool (
269- REMOTE_LOG_READER_THREAD_NAME_PREFIX ,
268+ REMOTE_LOG_READER_THREAD_NAME_PATTERN ,
270269 rlmConfig .remoteLogReaderThreads (),
271270 rlmConfig .remoteLogReaderMaxPendingTasks ()
272271 );
@@ -290,6 +289,24 @@ public void updateFetchQuota(long quota) {
290289 rlmFetchQuotaManager .updateQuota (new Quota (quota , true ));
291290 }
292291
292+ public void resizeCopierThreadPool (int newSize ) {
293+ int currentSize = rlmCopyThreadPool .getCorePoolSize ();
294+ LOGGER .info ("Updating remote copy thread pool size from {} to {}" , currentSize , newSize );
295+ rlmCopyThreadPool .setCorePoolSize (newSize );
296+ }
297+
298+ public void resizeExpirationThreadPool (int newSize ) {
299+ int currentSize = rlmExpirationThreadPool .getCorePoolSize ();
300+ LOGGER .info ("Updating remote expiration thread pool size from {} to {}" , currentSize , newSize );
301+ rlmExpirationThreadPool .setCorePoolSize (newSize );
302+ }
303+
304+ public void resizeReaderThreadPool (int newSize ) {
305+ int currentSize = remoteStorageReaderThreadPool .getCorePoolSize ();
306+ LOGGER .info ("Updating remote reader thread pool size from {} to {}" , currentSize , newSize );
307+ remoteStorageReaderThreadPool .setCorePoolSize (newSize );
308+ }
309+
293310 private void removeMetrics () {
294311 metricsGroup .removeMetric (REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC );
295312 metricsGroup .removeMetric (REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC );
@@ -2077,28 +2094,10 @@ public void close() {
20772094 }
20782095 }
20792096
2080- private static void shutdownAndAwaitTermination (ExecutorService pool , String poolName , long timeout , TimeUnit timeUnit ) {
2081- // This pattern of shutting down thread pool is adopted from here: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html
2082- LOGGER .info ("Shutting down of thread pool {} is started" , poolName );
2083- pool .shutdown (); // Disable new tasks from being submitted
2084- try {
2085- // Wait a while for existing tasks to terminate
2086- if (!pool .awaitTermination (timeout , timeUnit )) {
2087- LOGGER .info ("Shutting down of thread pool {} could not be completed. It will retry cancelling the tasks using shutdownNow." , poolName );
2088- pool .shutdownNow (); // Cancel currently executing tasks
2089- // Wait a while for tasks to respond to being cancelled
2090- if (!pool .awaitTermination (timeout , timeUnit ))
2091- LOGGER .warn ("Shutting down of thread pool {} could not be completed even after retrying cancellation of the tasks using shutdownNow." , poolName );
2092- }
2093- } catch (InterruptedException ex ) {
2094- // (Re-)Cancel if current thread also interrupted
2095- LOGGER .warn ("Encountered InterruptedException while shutting down thread pool {}. It will retry cancelling the tasks using shutdownNow." , poolName );
2096- pool .shutdownNow ();
2097- // Preserve interrupt status
2098- Thread .currentThread ().interrupt ();
2099- }
2100-
2101- LOGGER .info ("Shutting down of thread pool {} is completed" , poolName );
2097+ private static void shutdownAndAwaitTermination (ExecutorService executor , String poolName , long timeout , TimeUnit timeUnit ) {
2098+ LOGGER .info ("Shutting down {} executor" , poolName );
2099+ ThreadUtils .shutdownExecutorServiceQuietly (executor , timeout , timeUnit );
2100+ LOGGER .info ("{} executor shutdown completed" , poolName );
21022101 }
21032102
21042103 //Visible for testing
@@ -2152,31 +2151,32 @@ RLMTaskWithFuture followerTask(TopicIdPartition partition) {
21522151 static class RLMScheduledThreadPool {
21532152
21542153 private static final Logger LOGGER = LoggerFactory .getLogger (RLMScheduledThreadPool .class );
2155- private final int poolSize ;
21562154 private final String threadPoolName ;
2157- private final String threadNamePrefix ;
2155+ private final String threadNamePattern ;
21582156 private final ScheduledThreadPoolExecutor scheduledThreadPool ;
21592157
2160- public RLMScheduledThreadPool (int poolSize , String threadPoolName , String threadNamePrefix ) {
2161- this .poolSize = poolSize ;
2158+ public RLMScheduledThreadPool (int poolSize , String threadPoolName , String threadNamePattern ) {
21622159 this .threadPoolName = threadPoolName ;
2163- this .threadNamePrefix = threadNamePrefix ;
2164- scheduledThreadPool = createPool ();
2160+ this .threadNamePattern = threadNamePattern ;
2161+ scheduledThreadPool = createPool (poolSize );
2162+ }
2163+
2164+ public void setCorePoolSize (int newSize ) {
2165+ scheduledThreadPool .setCorePoolSize (newSize );
2166+ }
2167+
2168+ public int getCorePoolSize () {
2169+ return scheduledThreadPool .getCorePoolSize ();
21652170 }
21662171
2167- private ScheduledThreadPoolExecutor createPool () {
2172+ private ScheduledThreadPoolExecutor createPool (int poolSize ) {
2173+ ThreadFactory threadFactory = ThreadUtils .createThreadFactory (threadNamePattern , true ,
2174+ (t , e ) -> LOGGER .error ("Uncaught exception in thread '{}':" , t .getName (), e ));
21682175 ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor (poolSize );
21692176 threadPool .setRemoveOnCancelPolicy (true );
21702177 threadPool .setExecuteExistingDelayedTasksAfterShutdownPolicy (false );
21712178 threadPool .setContinueExistingPeriodicTasksAfterShutdownPolicy (false );
2172- threadPool .setThreadFactory (new ThreadFactory () {
2173- private final AtomicInteger sequence = new AtomicInteger ();
2174-
2175- public Thread newThread (Runnable r ) {
2176- return KafkaThread .daemon (threadNamePrefix + sequence .incrementAndGet (), r );
2177- }
2178- });
2179-
2179+ threadPool .setThreadFactory (threadFactory );
21802180 return threadPool ;
21812181 }
21822182
0 commit comments