Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,19 +145,25 @@ private void initializeQueuedSegments(
AtomicInteger loadingReplicaCount
)
{
for (DataSegment segment : server.iterateAllSegments()) {
final Set<DataSegment> loadedSegments = Set.copyOf(server.iterateAllSegments());

for (DataSegment segment : loadedSegments) {
projectedSegmentCounts.addSegment(segment);
}

// Make a single atomic call to the peon to get all segments still in queue
final List<SegmentHolder> expiredSegments = new ArrayList<>();
for (SegmentHolder holder : peon.getSegmentsInQueue()) {
for (SegmentHolder holder : peon.getSegmentsInQueue(loadedSegments)) {
int runsInQueue = holder.incrementAndGetRunsInQueue();
if (runsInQueue > maxLifetimeInQueue) {
expiredSegments.add(holder);
}

final SegmentAction action = holder.getAction();
addToQueuedSegments(holder.getSegment(), simplify(action));
addToQueuedSegments(
holder.getSegment(),
action == SegmentAction.MOVE_FROM ? SegmentAction.MOVE_FROM : simplify(action)
);

if (action == SegmentAction.MOVE_TO) {
movingSegmentCount.incrementAndGet();
Expand All @@ -167,10 +173,6 @@ private void initializeQueuedSegments(
}
}

for (DataSegment segment : peon.getSegmentsMarkedToDrop()) {
addToQueuedSegments(segment, SegmentAction.MOVE_FROM);
}

if (!expiredSegments.isEmpty()) {
List<SegmentHolder> expiredSegmentsSubList =
expiredSegments.size() > 10 ? expiredSegments.subList(0, 10) : expiredSegments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.common.config.Configs;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
Expand Down Expand Up @@ -73,6 +74,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
*
Expand All @@ -93,6 +95,8 @@ public class HttpLoadQueuePeon implements LoadQueuePeon

private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentHashMap<>();
private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentHashMap<>();

@GuardedBy("lock")
private final Set<DataSegment> segmentsMarkedToDrop = ConcurrentHashMap.newKeySet();
private final LoadingRateTracker loadingRateTracker = new LoadingRateTracker();

Expand All @@ -101,12 +105,17 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
* drop requests as well. This need not be thread-safe as all operations on it
* are synchronized with the {@link #lock}.
*/
@GuardedBy("lock")
private final Set<SegmentHolder> queuedSegments = new TreeSet<>();

@GuardedBy("lock")
private final Set<SegmentHolder> recentlySucceededActions = new TreeSet<>();

/**
* Set of segments for which requests have been sent to the server and can
* not be cancelled anymore. This need not be thread-safe.
*/
@GuardedBy("lock")
private final Set<DataSegment> activeRequestSegments = new HashSet<>();

private final ScheduledExecutorService processingExecutor;
Expand Down Expand Up @@ -388,15 +397,20 @@ private void handleResponseStatus(DataSegmentChangeRequest changeRequest, Segmen
@Override
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
updateSuccessOrFailureInHolder(segmentsToLoad.remove(segment), status);
synchronized (lock) {
updateSuccessOrFailureInHolder(segmentsToLoad.remove(segment), status);
}
}

@Override
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
updateSuccessOrFailureInHolder(segmentsToDrop.remove(segment), status);
synchronized (lock) {
updateSuccessOrFailureInHolder(segmentsToDrop.remove(segment), status);
}
}

@GuardedBy("lock")
private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentChangeStatus status)
{
if (holder == null) {
Expand All @@ -410,6 +424,9 @@ private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentChangeS
} else {
onRequestCompleted(holder, RequestStatus.SUCCESS, status);
}

holder.markRequestSucceeded();
recentlySucceededActions.add(holder);
}
}, null
);
Expand Down Expand Up @@ -464,6 +481,7 @@ public void stop()
segmentsToLoad.clear();
queuedSegments.clear();
activeRequestSegments.clear();
recentlySucceededActions.clear();
queuedSize.set(0L);
loadingRateTracker.stop();
stats.get().clear();
Expand Down Expand Up @@ -508,6 +526,8 @@ public void loadSegment(DataSegment segment, SegmentAction action, LoadPeonCallb
public void dropSegment(DataSegment segment, LoadPeonCallback callback)
{
synchronized (lock) {
// Unmark the segment for dropping in case it was already marked
unmarkSegmentToDrop(segment);
if (stopped) {
log.warn(
"Server[%s] cannot drop segment[%s] because load queue peon is stopped.",
Expand Down Expand Up @@ -552,13 +572,44 @@ public Set<DataSegment> getTimedOutSegments()
}

@Override
public Set<SegmentHolder> getSegmentsInQueue()
public Set<SegmentHolder> getSegmentsInQueue(Set<DataSegment> segmentsLoadedOnServer)
{
final Set<SegmentHolder> segmentsInQueue;
final Set<SegmentHolder> queuedActions;
synchronized (lock) {
segmentsInQueue = new HashSet<>(queuedSegments);
queuedActions = new HashSet<>(queuedSegments);
final Set<DataSegment> segmentsInQueue =
queuedActions.stream().map(SegmentHolder::getSegment).collect(Collectors.toSet());

// Check all recently succeeded actions
final Set<SegmentHolder> succeededActions = Set.copyOf(recentlySucceededActions);
for (SegmentHolder holder : succeededActions) {
if (segmentsInQueue.contains(holder.getSegment())) {
// If a recently succeeded segment has been queued again, honor the state in the queue
recentlySucceededActions.remove(holder);
continue;
}

final boolean isSegmentLoaded = segmentsLoadedOnServer.contains(holder.getSegment());

if (holder.isLoad() == isSegmentLoaded) {
// Remove actions that have recently completed and are reflected in the inventory
recentlySucceededActions.remove(holder);
} else if (holder.isStaleSuccessfulRequest()) {
// If the inventory is taking too long to get updated, clean up the state of the peon
recentlySucceededActions.remove(holder);
} else {
// Add actions that have recently completed but are yet to reflect in the inventory
queuedActions.add(holder);
}
}

// Add entries for segments that are currently marked to be dropped
for (DataSegment segment : segmentsMarkedToDrop) {
queuedActions.add(new SegmentHolder(segment, SegmentAction.MOVE_FROM, config.getLoadTimeout(), null));
}
}
return segmentsInQueue;

return queuedActions;
}

@Override
Expand All @@ -582,19 +633,25 @@ public CoordinatorRunStats getAndResetStats()
@Override
public void markSegmentToDrop(DataSegment dataSegment)
{
segmentsMarkedToDrop.add(dataSegment);
synchronized (lock) {
segmentsMarkedToDrop.add(dataSegment);
}
}

@Override
public void unmarkSegmentToDrop(DataSegment dataSegment)
{
segmentsMarkedToDrop.remove(dataSegment);
synchronized (lock) {
segmentsMarkedToDrop.remove(dataSegment);
}
}

@Override
public Set<DataSegment> getSegmentsMarkedToDrop()
{
return Collections.unmodifiableSet(segmentsMarkedToDrop);
synchronized (lock) {
return Collections.unmodifiableSet(segmentsMarkedToDrop);
}
}

private void onRequestFailed(SegmentHolder holder, SegmentChangeStatus status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,16 @@ public interface LoadQueuePeon

Set<DataSegment> getSegmentsToLoad();

Set<SegmentHolder> getSegmentsInQueue();
/**
* Returns the segments currently queued on this peon for a load or drop operation.
* This also includes segments that are being moved from this server to another
* for balancing, and have currently only been marked with {@link #markSegmentToDrop}.
*
* @param segmentsLoadedOnServer Segments which are known to be currently loaded
* on the corresponding server. Used by the peon
* to reconcile its internal state.
*/
Set<SegmentHolder> getSegmentsInQueue(Set<DataSegment> segmentsLoadedOnServer);

Set<DataSegment> getSegmentsToDrop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class SegmentHolder implements Comparable<SegmentHolder>
// Guaranteed to store only non-null elements
private final List<LoadPeonCallback> callbacks = new ArrayList<>();
private final Stopwatch sinceRequestSentToServer = Stopwatch.createUnstarted();
private final Stopwatch sinceRequestSucceeded = Stopwatch.createUnstarted();
private int runsInQueue = 0;

public SegmentHolder(
Expand Down Expand Up @@ -153,6 +154,13 @@ public void markRequestSentToServer()
}
}

public void markRequestSucceeded()
{
if (!sinceRequestSucceeded.isRunning()) {
sinceRequestSucceeded.start();
}
}

/**
* A request is considered to have timed out if the time elapsed since it was
* first sent to the server is greater than the configured load timeout.
Expand All @@ -164,6 +172,15 @@ public boolean hasRequestTimedOut()
return sinceRequestSentToServer.millisElapsed() > requestTimeout.getMillis();
}

/**
* Returns true if it has already been {@link HttpLoadQueuePeonConfig#getLoadTimeout()}
* since this request completed successfully.
*/
public boolean isStaleSuccessfulRequest()
{
return sinceRequestSucceeded.millisElapsed() > requestTimeout.getMillis();
}

public int incrementAndGetRunsInQueue()
{
return ++runsInQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public boolean moveSegment(
&& !peonA.getSegmentsToDrop().contains(segment)
&& (taskMaster.isHttpLoading()
|| serverInventoryView.isSegmentLoadedByServer(serverNameB, segment))) {
peonA.unmarkSegmentToDrop(segment);
peonA.dropSegment(segment, moveFinishCallback);
} else {
moveFinishCallback.execute(success);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public void testRunRuleDoesNotExist()
)
.atLeastOnce();

EasyMock.expect(mockPeon.getSegmentsInQueue()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsInQueue(EasyMock.anyObject())).andReturn(Set.of()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.replay(databaseRuleManager, mockPeon);

Expand Down Expand Up @@ -736,7 +736,7 @@ public void testDropServerActuallyServesSegment()

LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class);
EasyMock.expect(anotherMockPeon.getSegmentsMarkedToDrop()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(anotherMockPeon.getSegmentsInQueue()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(anotherMockPeon.getSegmentsInQueue(EasyMock.anyObject())).andReturn(Set.of()).anyTimes();
EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Collections.emptySet()).anyTimes();

EasyMock.replay(anotherMockPeon);
Expand Down Expand Up @@ -1296,7 +1296,7 @@ private void mockEmptyPeon()
{
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsInQueue()).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(mockPeon.getSegmentsInQueue(EasyMock.anyObject())).andReturn(Set.of()).anyTimes();
EasyMock.replay(mockPeon);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ public ConcurrentSkipListSet<DataSegment> getSegmentsToLoad()
}

@Override
public Set<SegmentHolder> getSegmentsInQueue()
public Set<SegmentHolder> getSegmentsInQueue(Set<DataSegment> segmentsLoadedOnServer)
{
return Collections.emptySet();
return Set.of();
}

@Override
Expand Down
Loading