Skip to content

MINOR: refactoring periodic tasks to wrap a ControllerWriteOperation #19245

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.kafka.controller;

import org.apache.kafka.controller.errors.PeriodicControlTaskException;

import java.util.EnumSet;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand All @@ -29,11 +32,12 @@ class PeriodicTask {
private final String name;

/**
* The callback for this task. If ControllerResult.response is true, we will schedule the
* task again after only a very short delay. This is useful if we only finished part of the
* work we wanted to finish.
* The write operation for this periodic task. It contains two callbacks, one for generating records
* and a controller result, and one for processing the end offset of the batch.
* If ControllerResult.response is true, we will schedule the task again after only a very short delay.
* This is useful if we only finished part of the work we wanted to finish.
*/
private final Supplier<ControllerResult<Boolean>> op;
private final QuorumController.ControllerWriteOperation<Boolean> writeOp;

/**
* The period of the task when ControllerResult.response is true, in nanoseconds.
Expand All @@ -54,37 +58,44 @@ class PeriodicTask {

PeriodicTask(
String name,
Supplier<ControllerResult<Boolean>> op,
QuorumController.ControllerWriteOperation<Boolean> writeOp,
long periodNs,
EnumSet<PeriodicTaskFlag> flags
) {
this.name = name;
this.op = op;
this.immediatePeriodNs = DEFAULT_IMMEDIATE_PERIOD_NS;
this.periodNs = periodNs;
this.flags = flags;
this(name, writeOp, periodNs, flags, DEFAULT_IMMEDIATE_PERIOD_NS);
}

PeriodicTask(
String name,
Supplier<ControllerResult<Boolean>> op,
QuorumController.ControllerWriteOperation<Boolean> writeOp,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to have a separate callback like a Consumer<Long> that we can pass in here. Making people implement QuorumController.ControllerWriteOperation feels a bit messy since that's an inner class of QuorumController. It's also a bit confusing (to me, at least)

long periodNs,
EnumSet<PeriodicTaskFlag> flags,
long immediatePeriodNs
) {
this.name = name;
this.op = op;
this.immediatePeriodNs = immediatePeriodNs;
this.writeOp = writeOp;
this.periodNs = periodNs;
this.flags = flags;
this.immediatePeriodNs = immediatePeriodNs;
}

String name() {
return name;
}

Supplier<ControllerResult<Boolean>> op() {
return op;
return () -> {
try {
return writeOp.generateRecordsAndResult();
} catch (Exception e) {
throw new PeriodicControlTaskException(name + ": periodic task failed: " +
e.getMessage(), e);
}
};
}

Consumer<Long> processBatchEndOffsetOp() {
return writeOp::processBatchEndOffset;
}

long immediatePeriodNs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand Down Expand Up @@ -66,21 +65,21 @@ interface QueueAccessor {
void scheduleDeferred(
String tag,
long deadlineNs,
Supplier<ControllerResult<Void>> op
QuorumController.ControllerWriteOperation<Void> op
);

void cancelDeferred(String tag);
}

class PeriodicTaskOperation implements Supplier<ControllerResult<Void>> {
class PeriodicTaskOperation implements QuorumController.ControllerWriteOperation<Void> {
private final PeriodicTask task;

PeriodicTaskOperation(PeriodicTask task) {
this.task = task;
}

@Override
public ControllerResult<Void> get() {
public ControllerResult<Void> generateRecordsAndResult() {
long startNs = 0;
if (log.isDebugEnabled() || task.flags().contains(PeriodicTaskFlag.VERBOSE)) {
startNs = time.nanoseconds();
Expand Down Expand Up @@ -115,6 +114,11 @@ public ControllerResult<Void> get() {
return ControllerResult.of(result.records(), null);
}
}

@Override
public void processBatchEndOffset(long offset) {
task.processBatchEndOffsetOp().accept(offset);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,12 +518,12 @@ class PeriodicTaskControlManagerQueueAccessor implements PeriodicTaskControlMana
public void scheduleDeferred(
String tag,
long deadlineNs,
Supplier<ControllerResult<Void>> op
QuorumController.ControllerWriteOperation<Void> op
) {
EnumSet<ControllerOperationFlag> flags = EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME);
queue.scheduleDeferred(tag,
new EarliestDeadlineFunction(deadlineNs),
new ControllerWriteEvent<>(tag, op::get, flags));
new ControllerWriteEvent<>(tag, op, flags));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,21 @@ static class PeriodicTaskControlManagerTestEnv implements PeriodicTaskControlMan
public void scheduleDeferred(
String tag,
long deadlineNs,
Supplier<ControllerResult<Void>> op
QuorumController.ControllerWriteOperation<Void> op
) {
if (numCalls <= 0) {
throw new RuntimeException("too many deferred calls.");
}
numCalls--;
cancelDeferred(tag);
TrackedTask task = new TrackedTask(tag, deadlineNs, op);
Supplier<ControllerResult<Void>> taskOp = () -> {
try {
return op.generateRecordsAndResult();
} catch (Exception e) {
throw new RuntimeException(e);
}
};
TrackedTask task = new TrackedTask(tag, deadlineNs, taskOp);
tasks.computeIfAbsent(deadlineNs, __ -> new ArrayList<>()).add(task);
}

Expand Down