4848import java .util .TreeMap ;
4949import java .util .concurrent .CompletableFuture ;
5050import java .util .concurrent .ConcurrentHashMap ;
51+ import java .util .concurrent .locks .Lock ;
52+ import java .util .concurrent .locks .ReentrantLock ;
5153import java .util .stream .Stream ;
5254import java .util .stream .StreamSupport ;
5355
@@ -93,6 +95,8 @@ public final class AtlasRegistry extends AbstractRegistry implements AutoCloseab
9395 private long lastFlushTimestamp = -1L ;
9496 private final ConcurrentHashMap <Id , Consolidator > atlasMeasurements = new ConcurrentHashMap <>();
9597
98+ private final ConcurrentHashMap <String , Lock > publishTaskLocks = new ConcurrentHashMap <>();
99+
96100 /** Create a new instance. */
97101 @ Inject
98102 public AtlasRegistry (Clock clock , AtlasConfig config ) {
@@ -244,8 +248,24 @@ private Timer publishTaskTimer(String id) {
244248 return debugRegistry .timer (PUBLISH_TASK_TIMER , "id" , id );
245249 }
246250
247- synchronized void sendToAtlas () {
248- publishTaskTimer ("sendToAtlas" ).record (() -> {
251+ private void timePublishTask (String id , Runnable task ) {
252+ timePublishTask (id , id , task );
253+ }
254+
255+ private void timePublishTask (String id , String lockName , Runnable task ) {
256+ publishTaskTimer (id ).record (() -> {
257+ Lock lock = publishTaskLocks .computeIfAbsent (lockName , n -> new ReentrantLock ());
258+ lock .lock ();
259+ try {
260+ task .run ();
261+ } finally {
262+ lock .unlock ();
263+ }
264+ });
265+ }
266+
267+ void sendToAtlas () {
268+ timePublishTask ("sendToAtlas" , () -> {
249269 if (config .enabled ()) {
250270 long t = lastCompletedTimestamp (stepMillis );
251271 if (t > lastFlushTimestamp ) {
@@ -272,8 +292,8 @@ synchronized void sendToAtlas() {
272292 });
273293 }
274294
275- synchronized void sendToLWC () {
276- publishTaskTimer ("sendToLWC" ). record ( () -> {
295+ void sendToLWC () {
296+ timePublishTask ("sendToLWC" , () -> {
277297 long t = lastCompletedTimestamp (lwcStepMillis );
278298 //if (config.enabled() || config.lwcEnabled()) {
279299 // If either are enabled we poll the meters for each step interval to flush the
@@ -306,8 +326,8 @@ synchronized void sendToLWC() {
306326 }
307327
308328 /** Collect measurements from all the meters in the registry. */
309- synchronized void pollMeters (long t ) {
310- publishTaskTimer ("pollMeters" ). record ( () -> {
329+ void pollMeters (long t ) {
330+ timePublishTask ("pollMeters" , "atlasMeasurements" , () -> {
311331 if (t > lastPollTimestamp ) {
312332 MeasurementConsumer consumer = (id , timestamp , value ) -> {
313333 // Update the map for data to go to the Atlas storage layer
@@ -365,10 +385,10 @@ private void fetchSubscriptions() {
365385 * Get a list of all consolidated measurements intended to be sent to Atlas and break them
366386 * into batches.
367387 */
368- synchronized List <RollupPolicy .Result > getBatches (long t ) {
388+ List <RollupPolicy .Result > getBatches (long t ) {
369389 final int n = atlasMeasurements .size ();
370390 final List <RollupPolicy .Result > batches = new ArrayList <>(n / batchSize + 1 );
371- publishTaskTimer ("getBatches" ). record ( () -> {
391+ timePublishTask ("getBatches" , "atlasMeasurements" , () -> {
372392 debugRegistry .distributionSummary ("spectator.registrySize" ).record (n );
373393 List <Measurement > input = new ArrayList <>(n );
374394 Iterator <Map .Entry <Id , Consolidator >> it = atlasMeasurements .entrySet ().iterator ();
0 commit comments