Skip to content
This repository was archived by the owner on Sep 13, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public synchronized boolean update(double samplingRate, double lowerBound) {
isUpdated = true;
}
if (lowerBound != lowerBoundSampler.getMaxTracesPerSecond()) {
lowerBoundSampler = new RateLimitingSampler(lowerBound);
lowerBoundSampler.update(lowerBound);
isUpdated = true;
}
return isUpdated;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import com.uber.jaeger.Constants;
import com.uber.jaeger.utils.RateLimiter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
Expand All @@ -26,30 +25,29 @@
@ToString(exclude = "rateLimiter")
public class RateLimitingSampler implements Sampler {
public static final String TYPE = "ratelimiting";
private static final double MINUMUM_BALANCE = 1.0;

private final RateLimiter rateLimiter;
@Getter
private final double maxTracesPerSecond;
private double maxTracesPerSecond;
private final Map<String, Object> tags;

public RateLimitingSampler(double maxTracesPerSecond) {
this.maxTracesPerSecond = maxTracesPerSecond;
double maxBalance = maxTracesPerSecond < 1.0 ? 1.0 : maxTracesPerSecond;
this.rateLimiter = new RateLimiter(maxTracesPerSecond, maxBalance);
this.rateLimiter = new RateLimiter(maxTracesPerSecond, getMaxBalance(maxTracesPerSecond));

Map<String, Object> tags = new HashMap<String, Object>();
this.tags = new HashMap<String, Object>();
tags.put(Constants.SAMPLER_TYPE_TAG_KEY, TYPE);
tags.put(Constants.SAMPLER_PARAM_TAG_KEY, maxTracesPerSecond);
this.tags = Collections.unmodifiableMap(tags);
}

@Override
public SamplingStatus sample(String operation, long id) {
return SamplingStatus.of(this.rateLimiter.checkCredit(1.0), tags);
public synchronized SamplingStatus sample(String operation, long id) {
return SamplingStatus.of(this.rateLimiter.checkCredit(MINUMUM_BALANCE), tags);
}

@Override
public boolean equals(Object other) {
public synchronized boolean equals(Object other) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be synchronized?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this function and the update function are called at the same time, there could(?) be a race condition on the variable maxTracesPerSecond. Might be overkill

if (this == other) {
return true;
}
Expand All @@ -59,6 +57,20 @@ public boolean equals(Object other) {
return false;
}

public synchronized boolean update(double maxTracesPerSecond) {
if (this.maxTracesPerSecond == maxTracesPerSecond) {
return false;
}
this.maxTracesPerSecond = maxTracesPerSecond;
rateLimiter.update(maxTracesPerSecond, getMaxBalance(maxTracesPerSecond));
tags.put(Constants.SAMPLER_PARAM_TAG_KEY, maxTracesPerSecond);
return true;
}

private double getMaxBalance(double maxTracesPerSecond) {
return maxTracesPerSecond < MINUMUM_BALANCE ? MINUMUM_BALANCE : maxTracesPerSecond;
}

/**
* Only implemented to maintain compatibility with sampling interface.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private void updateRateLimitingOrProbabilisticSampler(SamplingStrategyResponse r
sampler = new ProbabilisticSampler(strategy.getSamplingRate());
} else if (response.getRateLimitingSampling() != null) {
RateLimitingSamplingStrategy strategy = response.getRateLimitingSampling();
sampler = new RateLimitingSampler(strategy.getMaxTracesPerSecond());
sampler = updateRateLimitingSampler(strategy.getMaxTracesPerSecond());
} else {
metrics.samplerParsingFailure.inc(1);
log.error("No strategy present in response. Not updating sampler.");
Expand All @@ -145,6 +145,16 @@ private void updateRateLimitingOrProbabilisticSampler(SamplingStrategyResponse r
}
}

private Sampler updateRateLimitingSampler(double maxTracesPerSecond) {
if (this.sampler instanceof RateLimitingSampler) {
if (((RateLimitingSampler) this.sampler).update(maxTracesPerSecond)) {
metrics.samplerUpdated.inc(1);
}
return this.sampler;
}
return new RateLimitingSampler(maxTracesPerSecond);
}

private synchronized void updatePerOperationSampler(OperationSamplingParameters samplingParameters) {
if (sampler instanceof PerOperationSampler) {
if (((PerOperationSampler) sampler).update(samplingParameters)) {
Expand Down
38 changes: 28 additions & 10 deletions jaeger-core/src/main/java/com/uber/jaeger/utils/RateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,54 @@

package com.uber.jaeger.utils;

import java.util.Random;

public class RateLimiter {
private final double creditsPerNanosecond;

private static final double SECONDS_IN_NANOSECONDS = 1.0e9;
private static final Random RANDOM = new Random();

private double creditsPerNanosecond;
private final Clock clock;
private double balance;
private double maxBalance;
private long lastTick;

public RateLimiter(double creditsPerSecond, double maxBalance) {
this(creditsPerSecond, maxBalance, new SystemClock());
this(creditsPerSecond, maxBalance, new SystemClock(), maxBalance * RANDOM.nextDouble());
}

public RateLimiter(double creditsPerSecond, double maxBalance, Clock clock) {
public RateLimiter(double creditsPerSecond, double maxBalance, Clock clock, double initialBalance) {
this.clock = clock;
this.balance = maxBalance;
this.balance = initialBalance;
this.maxBalance = maxBalance;
this.creditsPerNanosecond = creditsPerSecond / SECONDS_IN_NANOSECONDS;
}

public void update(double creditsPerSecond, double maxBalance) {
updateBalance();
creditsPerNanosecond = creditsPerSecond / SECONDS_IN_NANOSECONDS;
// The new balance should be proportional to the old balance.
balance = maxBalance * balance / this.maxBalance;
this.maxBalance = maxBalance;
this.creditsPerNanosecond = creditsPerSecond / 1.0e9;
}

public boolean checkCredit(double itemCost) {
updateBalance();
if (balance >= itemCost) {
balance -= itemCost;
return true;
}
return false;
}

private void updateBalance() {
long currentTime = clock.currentNanoTicks();
double elapsedTime = currentTime - lastTick;
lastTick = currentTime;
balance += elapsedTime * creditsPerNanosecond;
if (balance > maxBalance) {
balance = maxBalance;
}
if (balance >= itemCost) {
balance -= itemCost;
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.Map;
import org.junit.Test;

public class TestConstSampler {
public class ConstSamplerTest {
@Test
public void testTags() {
ConstSampler sampler = new ConstSampler(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,24 @@
package com.uber.jaeger.samplers;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.Map;
import org.junit.Test;

public class TestRateLimitingSampler {
public class RateLimitingSamplerTest {
@Test
public void testTags() {
RateLimitingSampler sampler = new RateLimitingSampler(123);
Map<String, Object> tags = sampler.sample("operate", 11).getTags();
assertEquals("ratelimiting", tags.get("sampler.type"));
assertEquals(123.0, tags.get("sampler.param"));

assertFalse(sampler.update(123));

assertTrue(sampler.update(42));
tags = sampler.sample("operate", 11).getTags();
assertEquals(42.0, tags.get("sampler.param"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ public class RemoteControlledSamplerTest {
@Mock private SamplingManager samplingManager;
@Mock private Sampler initialSampler;
private Metrics metrics;
private InMemoryStatsReporter metricsReporter;
private static final String SERVICE_NAME = "thachi mamu";

private RemoteControlledSampler undertest;

@Before
public void setUp() throws Exception {
metrics = Metrics.fromStatsReporter(new InMemoryStatsReporter());
metricsReporter = new InMemoryStatsReporter();
metrics = Metrics.fromStatsReporter(metricsReporter);
undertest = new RemoteControlledSampler(SERVICE_NAME, samplingManager, initialSampler, metrics);
}

Expand Down Expand Up @@ -80,6 +82,22 @@ public void testUpdateToRateLimitingSampler() throws Exception {
undertest.updateSampler();

assertEquals(new RateLimitingSampler(tracesPerSecond), undertest.getSampler());
assertEquals(1L, (long) metricsReporter.counters.get("jaeger:sampler_updates.result=ok"));

// Update should be short circuited if using the same strategy
undertest.updateSampler();
assertEquals(1L, (long) metricsReporter.counters.get("jaeger:sampler_updates.result=ok"));

// Try updating an already existing rateLimitingSampler
final int newTracesPerSecond = 24;
rateLimitingResponse = new SamplingStrategyResponse(null,
new RateLimitingSamplingStrategy(newTracesPerSecond), null);
when(samplingManager.getSamplingStrategy(SERVICE_NAME)).thenReturn(rateLimitingResponse);

undertest.updateSampler();

assertEquals(new RateLimitingSampler(newTracesPerSecond), undertest.getSampler());
assertEquals(2L, (long) metricsReporter.counters.get("jaeger:sampler_updates.result=ok"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public boolean isMicrosAccurate() {
@Test
public void testRateLimiterWholeNumber() {
MockClock clock = new MockClock();
RateLimiter limiter = new RateLimiter(2.0, 2.0, clock);
RateLimiter limiter = new RateLimiter(2.0, 2.0, clock, 2.0);

long currentTime = TimeUnit.MICROSECONDS.toNanos(100);
clock.timeNanos = currentTime;
Expand Down Expand Up @@ -79,7 +79,7 @@ public void testRateLimiterWholeNumber() {
@Test
public void testRateLimiterLessThanOne() {
MockClock clock = new MockClock();
RateLimiter limiter = new RateLimiter(0.5, 0.5, clock);
RateLimiter limiter = new RateLimiter(0.5, 0.5, clock, 0.5);

long currentTime = TimeUnit.MICROSECONDS.toNanos(100);
clock.timeNanos = currentTime;
Expand Down Expand Up @@ -112,7 +112,7 @@ public void testRateLimiterLessThanOne() {
@Test
public void testRateLimiterMaxBalance() {
MockClock clock = new MockClock();
RateLimiter limiter = new RateLimiter(0.1, 1.0, clock);
RateLimiter limiter = new RateLimiter(0.1, 1.0, clock, 1.0);

long currentTime = TimeUnit.MICROSECONDS.toNanos(100);
clock.timeNanos = currentTime;
Expand All @@ -126,4 +126,23 @@ public void testRateLimiterMaxBalance() {
assertTrue(limiter.checkCredit(1.0));
assertFalse(limiter.checkCredit(1.0));
}

@Test
public void testRateLimiterUpdate() {
RateLimiter limiter = new RateLimiter(3.0, 3.0, new MockClock(), 3.0);

// After this call, there should be 2 credits left.
assertTrue(limiter.checkCredit(1.0));

// Update to a max balance of 1 should only leave 2/3 credits, not enough for a message.
limiter.update(1.0, 1.0);
assertFalse(limiter.checkCredit(1.0));

// Revert back to max balance of 3, there should be 2 credits available.
limiter.update(3.0, 3.0);
assertTrue(limiter.checkCredit(1.0));
assertTrue(limiter.checkCredit(1.0));
assertFalse(limiter.checkCredit(1.0));
}
}