diff --git a/jaeger-core/src/main/java/com/uber/jaeger/samplers/GuaranteedThroughputSampler.java b/jaeger-core/src/main/java/com/uber/jaeger/samplers/GuaranteedThroughputSampler.java index 38ee56a..90abdb1 100644 --- a/jaeger-core/src/main/java/com/uber/jaeger/samplers/GuaranteedThroughputSampler.java +++ b/jaeger-core/src/main/java/com/uber/jaeger/samplers/GuaranteedThroughputSampler.java @@ -62,7 +62,7 @@ public synchronized boolean update(double samplingRate, double lowerBound) { isUpdated = true; } if (lowerBound != lowerBoundSampler.getMaxTracesPerSecond()) { - lowerBoundSampler = new RateLimitingSampler(lowerBound); + lowerBoundSampler.update(lowerBound); isUpdated = true; } return isUpdated; diff --git a/jaeger-core/src/main/java/com/uber/jaeger/samplers/RateLimitingSampler.java b/jaeger-core/src/main/java/com/uber/jaeger/samplers/RateLimitingSampler.java index 848d192..6ec5226 100644 --- a/jaeger-core/src/main/java/com/uber/jaeger/samplers/RateLimitingSampler.java +++ b/jaeger-core/src/main/java/com/uber/jaeger/samplers/RateLimitingSampler.java @@ -16,7 +16,6 @@ import com.uber.jaeger.Constants; import com.uber.jaeger.utils.RateLimiter; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import lombok.Getter; @@ -26,30 +25,29 @@ @ToString(exclude = "rateLimiter") public class RateLimitingSampler implements Sampler { public static final String TYPE = "ratelimiting"; + private static final double MINUMUM_BALANCE = 1.0; private final RateLimiter rateLimiter; @Getter - private final double maxTracesPerSecond; + private double maxTracesPerSecond; private final Map tags; public RateLimitingSampler(double maxTracesPerSecond) { this.maxTracesPerSecond = maxTracesPerSecond; - double maxBalance = maxTracesPerSecond < 1.0 ? 1.0 : maxTracesPerSecond; - this.rateLimiter = new RateLimiter(maxTracesPerSecond, maxBalance); + this.rateLimiter = new RateLimiter(maxTracesPerSecond, getMaxBalance(maxTracesPerSecond)); - Map tags = new HashMap(); + this.tags = new HashMap(); tags.put(Constants.SAMPLER_TYPE_TAG_KEY, TYPE); tags.put(Constants.SAMPLER_PARAM_TAG_KEY, maxTracesPerSecond); - this.tags = Collections.unmodifiableMap(tags); } @Override - public SamplingStatus sample(String operation, long id) { - return SamplingStatus.of(this.rateLimiter.checkCredit(1.0), tags); + public synchronized SamplingStatus sample(String operation, long id) { + return SamplingStatus.of(this.rateLimiter.checkCredit(MINUMUM_BALANCE), tags); } @Override - public boolean equals(Object other) { + public synchronized boolean equals(Object other) { if (this == other) { return true; } @@ -59,6 +57,20 @@ public boolean equals(Object other) { return false; } + public synchronized boolean update(double maxTracesPerSecond) { + if (this.maxTracesPerSecond == maxTracesPerSecond) { + return false; + } + this.maxTracesPerSecond = maxTracesPerSecond; + rateLimiter.update(maxTracesPerSecond, getMaxBalance(maxTracesPerSecond)); + tags.put(Constants.SAMPLER_PARAM_TAG_KEY, maxTracesPerSecond); + return true; + } + + private double getMaxBalance(double maxTracesPerSecond) { + return maxTracesPerSecond < MINUMUM_BALANCE ? MINUMUM_BALANCE : maxTracesPerSecond; + } + /** * Only implemented to maintain compatibility with sampling interface. */ diff --git a/jaeger-core/src/main/java/com/uber/jaeger/samplers/RemoteControlledSampler.java b/jaeger-core/src/main/java/com/uber/jaeger/samplers/RemoteControlledSampler.java index 76a1db6..ab58dcf 100644 --- a/jaeger-core/src/main/java/com/uber/jaeger/samplers/RemoteControlledSampler.java +++ b/jaeger-core/src/main/java/com/uber/jaeger/samplers/RemoteControlledSampler.java @@ -130,7 +130,7 @@ private void updateRateLimitingOrProbabilisticSampler(SamplingStrategyResponse r sampler = new ProbabilisticSampler(strategy.getSamplingRate()); } else if (response.getRateLimitingSampling() != null) { RateLimitingSamplingStrategy strategy = response.getRateLimitingSampling(); - sampler = new RateLimitingSampler(strategy.getMaxTracesPerSecond()); + sampler = updateRateLimitingSampler(strategy.getMaxTracesPerSecond()); } else { metrics.samplerParsingFailure.inc(1); log.error("No strategy present in response. Not updating sampler."); @@ -145,6 +145,16 @@ private void updateRateLimitingOrProbabilisticSampler(SamplingStrategyResponse r } } + private Sampler updateRateLimitingSampler(double maxTracesPerSecond) { + if (this.sampler instanceof RateLimitingSampler) { + if (((RateLimitingSampler) this.sampler).update(maxTracesPerSecond)) { + metrics.samplerUpdated.inc(1); + } + return this.sampler; + } + return new RateLimitingSampler(maxTracesPerSecond); + } + private synchronized void updatePerOperationSampler(OperationSamplingParameters samplingParameters) { if (sampler instanceof PerOperationSampler) { if (((PerOperationSampler) sampler).update(samplingParameters)) { diff --git a/jaeger-core/src/main/java/com/uber/jaeger/utils/RateLimiter.java b/jaeger-core/src/main/java/com/uber/jaeger/utils/RateLimiter.java index fb7e2e7..ecc649e 100644 --- a/jaeger-core/src/main/java/com/uber/jaeger/utils/RateLimiter.java +++ b/jaeger-core/src/main/java/com/uber/jaeger/utils/RateLimiter.java @@ -14,25 +14,48 @@ package com.uber.jaeger.utils; +import java.util.Random; + public class RateLimiter { - private final double creditsPerNanosecond; + + private static final double SECONDS_IN_NANOSECONDS = 1.0e9; + private static final Random RANDOM = new Random(); + + private double creditsPerNanosecond; private final Clock clock; private double balance; private double maxBalance; private long lastTick; public RateLimiter(double creditsPerSecond, double maxBalance) { - this(creditsPerSecond, maxBalance, new SystemClock()); + this(creditsPerSecond, maxBalance, new SystemClock(), maxBalance * RANDOM.nextDouble()); } - public RateLimiter(double creditsPerSecond, double maxBalance, Clock clock) { + public RateLimiter(double creditsPerSecond, double maxBalance, Clock clock, double initialBalance) { this.clock = clock; - this.balance = maxBalance; + this.balance = initialBalance; + this.maxBalance = maxBalance; + this.creditsPerNanosecond = creditsPerSecond / SECONDS_IN_NANOSECONDS; + } + + public void update(double creditsPerSecond, double maxBalance) { + updateBalance(); + creditsPerNanosecond = creditsPerSecond / SECONDS_IN_NANOSECONDS; + // The new balance should be proportional to the old balance. + balance = maxBalance * balance / this.maxBalance; this.maxBalance = maxBalance; - this.creditsPerNanosecond = creditsPerSecond / 1.0e9; } public boolean checkCredit(double itemCost) { + updateBalance(); + if (balance >= itemCost) { + balance -= itemCost; + return true; + } + return false; + } + + private void updateBalance() { long currentTime = clock.currentNanoTicks(); double elapsedTime = currentTime - lastTick; lastTick = currentTime; @@ -40,10 +63,5 @@ public boolean checkCredit(double itemCost) { if (balance > maxBalance) { balance = maxBalance; } - if (balance >= itemCost) { - balance -= itemCost; - return true; - } - return false; } } diff --git a/jaeger-core/src/test/java/com/uber/jaeger/samplers/TestConstSampler.java b/jaeger-core/src/test/java/com/uber/jaeger/samplers/ConstSamplerTest.java similarity index 96% rename from jaeger-core/src/test/java/com/uber/jaeger/samplers/TestConstSampler.java rename to jaeger-core/src/test/java/com/uber/jaeger/samplers/ConstSamplerTest.java index 22b8bac..502b9be 100644 --- a/jaeger-core/src/test/java/com/uber/jaeger/samplers/TestConstSampler.java +++ b/jaeger-core/src/test/java/com/uber/jaeger/samplers/ConstSamplerTest.java @@ -19,7 +19,7 @@ import java.util.Map; import org.junit.Test; -public class TestConstSampler { +public class ConstSamplerTest { @Test public void testTags() { ConstSampler sampler = new ConstSampler(true); diff --git a/jaeger-core/src/test/java/com/uber/jaeger/samplers/TestRateLimitingSampler.java b/jaeger-core/src/test/java/com/uber/jaeger/samplers/RateLimitingSamplerTest.java similarity index 76% rename from jaeger-core/src/test/java/com/uber/jaeger/samplers/TestRateLimitingSampler.java rename to jaeger-core/src/test/java/com/uber/jaeger/samplers/RateLimitingSamplerTest.java index 610ec58..9306612 100644 --- a/jaeger-core/src/test/java/com/uber/jaeger/samplers/TestRateLimitingSampler.java +++ b/jaeger-core/src/test/java/com/uber/jaeger/samplers/RateLimitingSamplerTest.java @@ -15,16 +15,24 @@ package com.uber.jaeger.samplers; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.util.Map; import org.junit.Test; -public class TestRateLimitingSampler { +public class RateLimitingSamplerTest { @Test public void testTags() { RateLimitingSampler sampler = new RateLimitingSampler(123); Map tags = sampler.sample("operate", 11).getTags(); assertEquals("ratelimiting", tags.get("sampler.type")); assertEquals(123.0, tags.get("sampler.param")); + + assertFalse(sampler.update(123)); + + assertTrue(sampler.update(42)); + tags = sampler.sample("operate", 11).getTags(); + assertEquals(42.0, tags.get("sampler.param")); } } diff --git a/jaeger-core/src/test/java/com/uber/jaeger/samplers/RemoteControlledSamplerTest.java b/jaeger-core/src/test/java/com/uber/jaeger/samplers/RemoteControlledSamplerTest.java index a2d08cc..415f083 100644 --- a/jaeger-core/src/test/java/com/uber/jaeger/samplers/RemoteControlledSamplerTest.java +++ b/jaeger-core/src/test/java/com/uber/jaeger/samplers/RemoteControlledSamplerTest.java @@ -43,13 +43,15 @@ public class RemoteControlledSamplerTest { @Mock private SamplingManager samplingManager; @Mock private Sampler initialSampler; private Metrics metrics; + private InMemoryStatsReporter metricsReporter; private static final String SERVICE_NAME = "thachi mamu"; private RemoteControlledSampler undertest; @Before public void setUp() throws Exception { - metrics = Metrics.fromStatsReporter(new InMemoryStatsReporter()); + metricsReporter = new InMemoryStatsReporter(); + metrics = Metrics.fromStatsReporter(metricsReporter); undertest = new RemoteControlledSampler(SERVICE_NAME, samplingManager, initialSampler, metrics); } @@ -80,6 +82,22 @@ public void testUpdateToRateLimitingSampler() throws Exception { undertest.updateSampler(); assertEquals(new RateLimitingSampler(tracesPerSecond), undertest.getSampler()); + assertEquals(1L, (long) metricsReporter.counters.get("jaeger:sampler_updates.result=ok")); + + // Update should be short circuited if using the same strategy + undertest.updateSampler(); + assertEquals(1L, (long) metricsReporter.counters.get("jaeger:sampler_updates.result=ok")); + + // Try updating an already existing rateLimitingSampler + final int newTracesPerSecond = 24; + rateLimitingResponse = new SamplingStrategyResponse(null, + new RateLimitingSamplingStrategy(newTracesPerSecond), null); + when(samplingManager.getSamplingStrategy(SERVICE_NAME)).thenReturn(rateLimitingResponse); + + undertest.updateSampler(); + + assertEquals(new RateLimitingSampler(newTracesPerSecond), undertest.getSampler()); + assertEquals(2L, (long) metricsReporter.counters.get("jaeger:sampler_updates.result=ok")); } @Test diff --git a/jaeger-core/src/test/java/com/uber/jaeger/utils/RateLimiterTest.java b/jaeger-core/src/test/java/com/uber/jaeger/utils/RateLimiterTest.java index 6d20e2d..a3d3f23 100644 --- a/jaeger-core/src/test/java/com/uber/jaeger/utils/RateLimiterTest.java +++ b/jaeger-core/src/test/java/com/uber/jaeger/utils/RateLimiterTest.java @@ -46,7 +46,7 @@ public boolean isMicrosAccurate() { @Test public void testRateLimiterWholeNumber() { MockClock clock = new MockClock(); - RateLimiter limiter = new RateLimiter(2.0, 2.0, clock); + RateLimiter limiter = new RateLimiter(2.0, 2.0, clock, 2.0); long currentTime = TimeUnit.MICROSECONDS.toNanos(100); clock.timeNanos = currentTime; @@ -79,7 +79,7 @@ public void testRateLimiterWholeNumber() { @Test public void testRateLimiterLessThanOne() { MockClock clock = new MockClock(); - RateLimiter limiter = new RateLimiter(0.5, 0.5, clock); + RateLimiter limiter = new RateLimiter(0.5, 0.5, clock, 0.5); long currentTime = TimeUnit.MICROSECONDS.toNanos(100); clock.timeNanos = currentTime; @@ -112,7 +112,7 @@ public void testRateLimiterLessThanOne() { @Test public void testRateLimiterMaxBalance() { MockClock clock = new MockClock(); - RateLimiter limiter = new RateLimiter(0.1, 1.0, clock); + RateLimiter limiter = new RateLimiter(0.1, 1.0, clock, 1.0); long currentTime = TimeUnit.MICROSECONDS.toNanos(100); clock.timeNanos = currentTime; @@ -126,4 +126,23 @@ public void testRateLimiterMaxBalance() { assertTrue(limiter.checkCredit(1.0)); assertFalse(limiter.checkCredit(1.0)); } + + @Test + public void testRateLimiterUpdate() { + RateLimiter limiter = new RateLimiter(3.0, 3.0, new MockClock(), 3.0); + + // After this call, there should be 2 credits left. + assertTrue(limiter.checkCredit(1.0)); + + // Update to a max balance of 1 should only leave 2/3 credits, not enough for a message. + limiter.update(1.0, 1.0); + assertFalse(limiter.checkCredit(1.0)); + + // Revert back to max balance of 3, there should be 2 credits available. + limiter.update(3.0, 3.0); + assertTrue(limiter.checkCredit(1.0)); + assertTrue(limiter.checkCredit(1.0)); + assertFalse(limiter.checkCredit(1.0)); + } } +