Skip to content

Commit 4fe8349

Browse files
committed
Fix Java sampler live upload closing thread pool
1 parent 7e3a173 commit 4fe8349

11 files changed

Lines changed: 84 additions & 54 deletions

File tree

spark-common/src/main/java/me/lucko/spark/common/monitor/MonitoringExecutor.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,14 @@
2020

2121
package me.lucko.spark.common.monitor;
2222

23+
import me.lucko.spark.common.util.SparkScheduledThreadPoolExecutor;
2324
import me.lucko.spark.common.util.SparkThreadFactory;
2425

25-
import java.util.concurrent.Executors;
2626
import java.util.concurrent.ScheduledExecutorService;
2727

2828
public enum MonitoringExecutor {
2929
;
3030

3131
/** The executor used to monitor & calculate rolling averages. */
32-
public static final ScheduledExecutorService INSTANCE = Executors.newSingleThreadScheduledExecutor(r -> {
33-
Thread thread = Executors.defaultThreadFactory().newThread(r);
34-
thread.setName("spark-monitoring-thread");
35-
thread.setUncaughtExceptionHandler(SparkThreadFactory.EXCEPTION_HANDLER);
36-
thread.setDaemon(true);
37-
return thread;
38-
});
32+
public static final ScheduledExecutorService INSTANCE = new SparkScheduledThreadPoolExecutor(1, new SparkThreadFactory("spark-monitoring", true));
3933
}

spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncProfilerJob.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import me.lucko.spark.common.sampler.ThreadDumper;
2626
import me.lucko.spark.common.sampler.async.jfr.JfrReader;
2727
import one.profiler.AsyncProfiler;
28-
import org.checkerframework.checker.lock.qual.GuardedBy;
2928

3029
import java.io.IOException;
3130
import java.nio.file.Files;

spark-common/src/main/java/me/lucko/spark/common/sampler/async/AsyncSampler.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
package me.lucko.spark.common.sampler.async;
2222

23-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2423
import me.lucko.spark.common.SparkPlatform;
2524
import me.lucko.spark.common.platform.PlatformInfo;
2625
import me.lucko.spark.common.sampler.AbstractSampler;
@@ -29,12 +28,12 @@
2928
import me.lucko.spark.common.sampler.SamplerType;
3029
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
3130
import me.lucko.spark.common.tick.TickHook;
31+
import me.lucko.spark.common.util.SparkScheduledThreadPoolExecutor;
3232
import me.lucko.spark.common.util.SparkThreadFactory;
3333
import me.lucko.spark.common.ws.ViewerSocket;
3434
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
3535

3636
import java.util.Locale;
37-
import java.util.concurrent.Executors;
3837
import java.util.concurrent.ScheduledExecutorService;
3938
import java.util.concurrent.ScheduledFuture;
4039
import java.util.concurrent.TimeUnit;
@@ -84,12 +83,7 @@ private AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCol
8483
this.dataAggregator = dataAggregator;
8584
this.forceNanoTime = forceNanoTime;
8685
this.profilerAccess = AsyncProfilerAccess.getInstance(platform);
87-
this.scheduler = Executors.newSingleThreadScheduledExecutor(
88-
new ThreadFactoryBuilder()
89-
.setNameFormat("spark-async-sampler-worker-thread")
90-
.setUncaughtExceptionHandler(SparkThreadFactory.EXCEPTION_HANDLER)
91-
.build()
92-
);
86+
this.scheduler = new SparkScheduledThreadPoolExecutor(1, new SparkThreadFactory("spark-async-sampler-worker", false));
9387
}
9488

9589
/**

spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaDataAggregator.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727
import me.lucko.spark.common.sampler.node.ThreadNode;
2828

2929
import java.lang.management.ThreadInfo;
30-
import java.util.List;
3130
import java.util.concurrent.ExecutorService;
32-
import java.util.concurrent.TimeUnit;
3331

3432
/**
3533
* Abstract {@link DataAggregator} for the {@link JavaSampler}.
@@ -75,19 +73,6 @@ protected void writeData(ThreadInfo threadInfo, int window) {
7573
}
7674
}
7775

78-
@Override
79-
public List<ThreadNode> exportData() {
80-
// wait for all pending data to be inserted
81-
this.workerPool.shutdown();
82-
try {
83-
this.workerPool.awaitTermination(15, TimeUnit.SECONDS);
84-
} catch (InterruptedException e) {
85-
e.printStackTrace();
86-
}
87-
88-
return super.exportData();
89-
}
90-
9176
static boolean isSleeping(ThreadInfo thread) {
9277
if (thread.getThreadState() == Thread.State.WAITING || thread.getThreadState() == Thread.State.TIMED_WAITING) {
9378
return true;

spark-common/src/main/java/me/lucko/spark/common/sampler/java/JavaSampler.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
package me.lucko.spark.common.sampler.java;
2222

23-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2423
import me.lucko.spark.common.SparkPlatform;
2524
import me.lucko.spark.common.sampler.AbstractSampler;
2625
import me.lucko.spark.common.sampler.SamplerMode;
@@ -30,14 +29,14 @@
3029
import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
3130
import me.lucko.spark.common.tick.TickHook;
3231
import me.lucko.spark.common.util.MethodDisambiguator;
32+
import me.lucko.spark.common.util.SparkScheduledThreadPoolExecutor;
3333
import me.lucko.spark.common.util.SparkThreadFactory;
3434
import me.lucko.spark.common.ws.ViewerSocket;
3535
import me.lucko.spark.proto.SparkSamplerProtos.SamplerData;
3636

3737
import java.lang.management.ManagementFactory;
3838
import java.lang.management.ThreadInfo;
3939
import java.lang.management.ThreadMXBean;
40-
import java.util.concurrent.Executors;
4140
import java.util.concurrent.ScheduledExecutorService;
4241
import java.util.concurrent.ScheduledFuture;
4342
import java.util.concurrent.TimeUnit;
@@ -48,15 +47,9 @@
4847
* A sampler implementation using Java (WarmRoast).
4948
*/
5049
public class JavaSampler extends AbstractSampler implements Runnable {
51-
private static final AtomicInteger THREAD_ID = new AtomicInteger(0);
5250

5351
/** The worker pool for inserting stack nodes */
54-
private final ScheduledExecutorService workerPool = Executors.newScheduledThreadPool(
55-
6, new ThreadFactoryBuilder()
56-
.setNameFormat("spark-java-sampler-" + THREAD_ID.getAndIncrement() + "-%d")
57-
.setUncaughtExceptionHandler(SparkThreadFactory.EXCEPTION_HANDLER)
58-
.build()
59-
);
52+
private final ScheduledExecutorService workerPool = new SparkScheduledThreadPoolExecutor(6, new SparkThreadFactory("spark-java-sampler", false));
6053

6154
/** The main sampling task */
6255
private ScheduledFuture<?> task;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* This file is part of spark.
3+
*
4+
* Copyright (c) lucko (Luck) <luck@lucko.me>
5+
* Copyright (c) contributors
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
21+
package me.lucko.spark.common.util;
22+
23+
import org.jspecify.annotations.NonNull;
24+
25+
import java.util.concurrent.CancellationException;
26+
import java.util.concurrent.ExecutionException;
27+
import java.util.concurrent.Future;
28+
import java.util.concurrent.ScheduledThreadPoolExecutor;
29+
import java.util.concurrent.ThreadFactory;
30+
31+
/**
32+
* A {@link ScheduledThreadPoolExecutor} which logs uncaught exceptions.
33+
*/
34+
public class SparkScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
35+
public SparkScheduledThreadPoolExecutor(int corePoolSize) {
36+
super(corePoolSize, new SparkThreadFactory());
37+
}
38+
39+
public SparkScheduledThreadPoolExecutor(int corePoolSize, @NonNull ThreadFactory threadFactory) {
40+
super(corePoolSize, threadFactory);
41+
}
42+
43+
@Override
44+
protected void afterExecute(Runnable r, Throwable t) {
45+
super.afterExecute(r, t);
46+
if (t == null && r instanceof Future<?> && ((Future<?>) r).isDone()) {
47+
try {
48+
((Future<?>) r).get();
49+
} catch (CancellationException ce) {
50+
t = ce;
51+
} catch (ExecutionException ee) {
52+
t = ee.getCause();
53+
} catch (InterruptedException ie) {
54+
// ignore/reset
55+
Thread.currentThread().interrupt();
56+
}
57+
}
58+
if (t != null) {
59+
SparkThreadFactory.EXCEPTION_HANDLER.uncaughtException(Thread.currentThread(), t);
60+
}
61+
}
62+
}

spark-common/src/main/java/me/lucko/spark/common/util/SparkThreadFactory.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,30 @@
2828

2929
public class SparkThreadFactory implements ThreadFactory {
3030

31-
public static final Thread.UncaughtExceptionHandler EXCEPTION_HANDLER = (t, e) -> {
32-
SparkStaticLogger.log(Level.SEVERE, "Uncaught exception thrown by thread " + t.getName(), e);
31+
static final Thread.UncaughtExceptionHandler EXCEPTION_HANDLER = (t, e) -> {
32+
SparkStaticLogger.log(Level.SEVERE, "Uncaught exception thrown in thread " + t.getName(), e);
3333
};
3434

3535
private static final AtomicInteger poolNumber = new AtomicInteger(1);
3636
private final AtomicInteger threadNumber = new AtomicInteger(1);
3737
private final String namePrefix;
38+
private final boolean daemon;
3839

3940
public SparkThreadFactory() {
40-
this.namePrefix = "spark-worker-pool-" +
41+
this("spark-worker-pool", true);
42+
}
43+
44+
public SparkThreadFactory(String prefix, boolean daemon) {
45+
this.namePrefix = prefix + "-" +
4146
poolNumber.getAndIncrement() +
4247
"-thread-";
48+
this.daemon = daemon;
4349
}
4450

4551
public Thread newThread(Runnable r) {
4652
Thread t = new Thread(r, this.namePrefix + this.threadNumber.getAndIncrement());
4753
t.setUncaughtExceptionHandler(EXCEPTION_HANDLER);
48-
t.setDaemon(true);
54+
t.setDaemon(this.daemon);
4955
return t;
5056
}
5157

spark-common/src/test/java/me/lucko/spark/test/plugin/TestSparkPlugin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@
2424
import me.lucko.spark.common.SparkPlugin;
2525
import me.lucko.spark.common.command.sender.CommandSender;
2626
import me.lucko.spark.common.platform.PlatformInfo;
27+
import me.lucko.spark.common.util.SparkScheduledThreadPoolExecutor;
2728
import me.lucko.spark.common.util.classfinder.ClassFinder;
2829
import me.lucko.spark.common.util.classfinder.FallbackClassFinder;
2930

3031
import java.nio.file.Path;
3132
import java.util.Collections;
3233
import java.util.HashMap;
3334
import java.util.Map;
34-
import java.util.concurrent.Executors;
3535
import java.util.concurrent.ScheduledExecutorService;
3636
import java.util.logging.Level;
3737
import java.util.logging.Logger;
@@ -40,7 +40,7 @@
4040
public class TestSparkPlugin implements SparkPlugin, AutoCloseable {
4141

4242
private static final Logger LOGGER = Logger.getLogger("spark-test");
43-
private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(16);
43+
private static final ScheduledExecutorService EXECUTOR_SERVICE = new SparkScheduledThreadPoolExecutor(16);
4444

4545
private final Path directory;
4646
private final Map<String, String> props;

spark-fabric/src/main/java/me/lucko/spark/fabric/placeholder/SparkFabricPlaceholderApi.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.google.gson.JsonParseException;
2424
import com.mojang.serialization.JsonOps;
2525
import eu.pb4.placeholders.api.PlaceholderContext;
26-
import eu.pb4.placeholders.api.PlaceholderHandler;
2726
import eu.pb4.placeholders.api.PlaceholderResult;
2827
import eu.pb4.placeholders.api.Placeholders;
2928
import me.lucko.spark.common.SparkPlatform;

spark-minecraft/src/main/java/me/lucko/spark/minecraft/plugin/MinecraftSparkPlugin.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,14 @@
3333
import me.lucko.spark.common.SparkPlatform;
3434
import me.lucko.spark.common.SparkPlugin;
3535
import me.lucko.spark.common.command.sender.CommandSender;
36-
import me.lucko.spark.common.util.SparkThreadFactory;
36+
import me.lucko.spark.common.util.SparkScheduledThreadPoolExecutor;
3737
import me.lucko.spark.minecraft.SparkMinecraftMod;
3838
import org.apache.logging.log4j.LogManager;
3939
import org.apache.logging.log4j.Logger;
4040

4141
import java.nio.file.Path;
4242
import java.util.Arrays;
4343
import java.util.concurrent.CompletableFuture;
44-
import java.util.concurrent.Executors;
4544
import java.util.concurrent.ScheduledExecutorService;
4645
import java.util.logging.Level;
4746

@@ -56,7 +55,7 @@ public abstract class MinecraftSparkPlugin<M extends SparkMinecraftMod> implemen
5655
protected MinecraftSparkPlugin(M mod) {
5756
this.mod = mod;
5857
this.logger = LogManager.getLogger("spark");
59-
this.scheduler = Executors.newScheduledThreadPool(4, new SparkThreadFactory());
58+
this.scheduler = new SparkScheduledThreadPoolExecutor(4);
6059
}
6160

6261
public void enable() {

0 commit comments

Comments
 (0)