Skip to content

Commit cb63df8

Browse files
[b/343209773] Improve ETA calculation - better handling of longer tasks
1 parent 3e5cc4c commit cb63df8

File tree

4 files changed

+277
-14
lines changed

4 files changed

+277
-14
lines changed

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/DurationFormatter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ public static String formatApproximateDuration(Duration duration) {
2525
long minutes = duration.toMinutes() % 60;
2626
long hours = duration.toHours();
2727

28-
if (hours == 0 && minutes == 0) {
29-
return "less than one minute";
28+
if (hours == 0 && minutes < 15) {
29+
return "less than 15 minutes";
3030
}
3131

3232
ImmutableList.Builder<String> tokens = ImmutableList.builder();

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/TasksRunner.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@
3535
import java.nio.charset.StandardCharsets;
3636
import java.sql.SQLException;
3737
import java.time.Duration;
38+
import java.util.ArrayDeque;
3839
import java.util.Arrays;
40+
import java.util.Collections;
41+
import java.util.Deque;
3942
import java.util.List;
4043
import java.util.concurrent.atomic.AtomicInteger;
4144
import javax.annotation.CheckForNull;
@@ -57,6 +60,9 @@ public class TasksRunner implements TaskRunContextOps {
5760
private final TaskSetState.Impl state;
5861
private final List<Task<?>> tasks;
5962

63+
// For tracking last 10 finished task durations
64+
private final Deque<Duration> lastTaskDurations = new ArrayDeque<>(10);
65+
6066
public TasksRunner(
6167
OutputHandleFactory sinkFactory,
6268
Handle handle,
@@ -102,15 +108,40 @@ private <T> T handleTask(Task<T> task) throws MetadataDumperUsageException {
102108
T t = runTask(task);
103109
if (!(task instanceof TaskGroup)) {
104110
numberOfCompletedTasks.getAndIncrement();
111+
112+
Duration taskDuration = stopwatch.elapsed();
113+
if (lastTaskDurations.size() == 10) {
114+
lastTaskDurations.removeFirst();
115+
}
116+
lastTaskDurations.addLast(taskDuration);
105117
}
106118
logProgress();
107119
return t;
108120
}
109121

122+
private Duration getAverageTaskDurationFromAllTasks() {
123+
return stopwatch.elapsed().dividedBy(max(1, numberOfCompletedTasks.get()));
124+
}
125+
126+
private Duration getAverageTaskDurationFromLatestTasks() {
127+
if (lastTaskDurations.isEmpty()) {
128+
return Duration.ZERO;
129+
}
130+
Duration total = lastTaskDurations.getLast().minus(lastTaskDurations.getFirst());
131+
132+
return total.dividedBy(lastTaskDurations.size());
133+
}
134+
135+
private Duration getTaskDuration() {
136+
return Collections.max(
137+
Arrays.asList(
138+
getAverageTaskDurationFromAllTasks(), getAverageTaskDurationFromLatestTasks()));
139+
}
140+
110141
private void logProgress() {
111142
int numberOfCompletedTasks = this.numberOfCompletedTasks.get();
112143

113-
Duration averageTimePerTask = stopwatch.elapsed().dividedBy(max(1, numberOfCompletedTasks));
144+
Duration averageTimePerTask = getTaskDuration();
114145

115146
int percentFinished = numberOfCompletedTasks * 100 / totalNumberOfTasks;
116147
String progressMessage = percentFinished + "% Completed";

dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/DurationFormatterTest.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@ public class DurationFormatterTest {
3333
@DataPoints("durations")
3434
public static final ImmutableList<Pair<Duration, String>> DURATIONS =
3535
ImmutableList.of(
36-
Pair.of(Duration.ZERO, "less than one minute"),
37-
Pair.of(Duration.ofMillis(1), "less than one minute"),
38-
Pair.of(Duration.ofSeconds(1), "less than one minute"),
39-
Pair.of(Duration.ofSeconds(59), "less than one minute"),
40-
Pair.of(Duration.ofMinutes(1), "~1 minute"),
41-
Pair.of(Duration.ofMinutes(1).plusSeconds(1), "~1 minute"),
42-
Pair.of(Duration.ofMinutes(1).plusSeconds(59), "~1 minute"),
43-
Pair.of(Duration.ofMinutes(2), "~2 minutes"),
44-
Pair.of(Duration.ofMinutes(2).plusSeconds(1), "~2 minutes"),
45-
Pair.of(Duration.ofMinutes(2).plusSeconds(59), "~2 minutes"),
46-
Pair.of(Duration.ofMinutes(11), "~11 minutes"),
36+
Pair.of(Duration.ZERO, "less than 15 minutes"),
37+
Pair.of(Duration.ofMillis(1), "less than 15 minutes"),
38+
Pair.of(Duration.ofSeconds(1), "less than 15 minutes"),
39+
Pair.of(Duration.ofSeconds(59), "less than 15 minutes"),
40+
Pair.of(Duration.ofMinutes(1), "less than 15 minutes"),
41+
Pair.of(Duration.ofMinutes(1).plusSeconds(1),"less than 15 minutes"),
42+
Pair.of(Duration.ofMinutes(1).plusSeconds(59),"less than 15 minutes"),
43+
Pair.of(Duration.ofMinutes(2), "less than 15 minutes"),
44+
Pair.of(Duration.ofMinutes(2).plusSeconds(1), "less than 15 minutes"),
45+
Pair.of(Duration.ofMinutes(2).plusSeconds(59), "less than 15 minutes"),
46+
Pair.of(Duration.ofMinutes(11), "less than 15 minutes"),
4747
Pair.of(Duration.ofMinutes(21), "~21 minutes"),
4848
Pair.of(Duration.ofMinutes(34), "~34 minutes"),
4949
Pair.of(Duration.ofMinutes(59).plusSeconds(59).plusMillis(999), "~59 minutes"),
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertTrue;
22+
import static org.junit.Assert.fail;
23+
import static org.mockito.Mockito.*;
24+
25+
import ch.qos.logback.classic.Logger;
26+
import ch.qos.logback.classic.spi.ILoggingEvent;
27+
import ch.qos.logback.core.read.ListAppender;
28+
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
29+
import com.google.edwmigration.dumper.application.dumper.io.OutputHandleFactory;
30+
import com.google.edwmigration.dumper.application.dumper.task.Task;
31+
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
32+
import com.google.edwmigration.dumper.application.dumper.task.TaskSetState;
33+
import java.io.IOException;
34+
import java.time.Duration;
35+
import java.util.Collections;
36+
import java.util.Deque;
37+
import java.util.List;
38+
39+
import org.junit.Before;
40+
import org.junit.Rule;
41+
import org.junit.Test;
42+
import org.junit.contrib.java.lang.system.SystemOutRule;
43+
import org.junit.runner.RunWith;
44+
import org.junit.runners.JUnit4;
45+
import org.slf4j.LoggerFactory;
46+
47+
@RunWith(JUnit4.class)
48+
public class TasksRunnerTest {
49+
@Rule
50+
public final SystemOutRule systemOutRule = new SystemOutRule().enableLog();
51+
52+
@Before
53+
public void setUp() {
54+
System.setProperty("org.slf4j.simpleLogger.logFile", "System.out");
55+
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); }
56+
57+
@Test
58+
public void testCreateContext_returnsValidTaskRunContext() throws IOException {
59+
OutputHandleFactory mockSinkFactory = mock(OutputHandleFactory.class);
60+
Handle mockHandle = mock(Handle.class);
61+
int threadPoolSize = 2;
62+
TaskSetState.Impl mockState = mock(TaskSetState.Impl.class);
63+
ConnectorArguments arguments = new ConnectorArguments("--connector", "test");
64+
TasksRunner runner =
65+
new TasksRunner(
66+
mockSinkFactory,
67+
mockHandle,
68+
threadPoolSize,
69+
mockState,
70+
Collections.emptyList(),
71+
arguments);
72+
73+
// Use reflection to access the private createContext method for direct testing
74+
try {
75+
java.lang.reflect.Method method =
76+
TasksRunner.class.getDeclaredMethod(
77+
"createContext",
78+
OutputHandleFactory.class,
79+
Handle.class,
80+
int.class,
81+
ConnectorArguments.class);
82+
method.setAccessible(true);
83+
Object context =
84+
method.invoke(runner, mockSinkFactory, mockHandle, threadPoolSize, arguments);
85+
86+
assertNotNull(context);
87+
assertTrue(context instanceof TaskRunContext);
88+
89+
TaskRunContext taskRunContext = (TaskRunContext) context;
90+
assertEquals(mockHandle, taskRunContext.getHandle());
91+
assertEquals(arguments, taskRunContext.getArguments());
92+
} catch (Exception e) {
93+
fail("Reflection failed: " + e.getMessage());
94+
}
95+
}
96+
97+
@Test
98+
public void testGetTaskDuration_ReturnsMaxOfAllAndLatest() throws Exception {
99+
OutputHandleFactory mockSinkFactory = mock(OutputHandleFactory.class);
100+
Handle mockHandle = mock(Handle.class);
101+
int threadPoolSize = 2;
102+
TaskSetState.Impl mockState = mock(TaskSetState.Impl.class);
103+
ConnectorArguments arguments = new ConnectorArguments("--connector", "test");
104+
List<Task<?>> tasks = Collections.nCopies(10, mock(Task.class));
105+
TasksRunner runner =
106+
new TasksRunner(mockSinkFactory, mockHandle, threadPoolSize, mockState, tasks, arguments);
107+
108+
// Set numberOfCompletedTasks to 5
109+
java.lang.reflect.Field completedField =
110+
TasksRunner.class.getDeclaredField("numberOfCompletedTasks");
111+
completedField.setAccessible(true);
112+
completedField.set(runner, new java.util.concurrent.atomic.AtomicInteger(5));
113+
114+
// Mock stopwatch to control elapsed time
115+
java.lang.reflect.Field stopwatchField =
116+
TasksRunner.class.getDeclaredField("stopwatch");
117+
stopwatchField.setAccessible(true);
118+
com.google.common.base.Stopwatch mockStopwatch = mock(com.google.common.base.Stopwatch.class);
119+
when(mockStopwatch.elapsed()).thenReturn(Duration.ofSeconds(50));
120+
stopwatchField.set(runner, mockStopwatch);
121+
122+
// Fill lastTaskDurations with 5 durations: 10, 20, 30, 40, 50 seconds
123+
java.lang.reflect.Field durationsField =
124+
TasksRunner.class.getDeclaredField("lastTaskDurations");
125+
durationsField.setAccessible(true);
126+
@SuppressWarnings("unchecked")
127+
Deque<Duration> durations = (Deque<Duration>) durationsField.get(runner);
128+
durations.clear();
129+
durations.add(Duration.ofSeconds(10));
130+
durations.add(Duration.ofSeconds(20));
131+
durations.add(Duration.ofSeconds(30));
132+
durations.add(Duration.ofSeconds(40));
133+
durations.add(Duration.ofSeconds(50));
134+
135+
// getAverageTaskDurationFromAllTasks = 50s / 5 = 10s
136+
// getAverageTaskDurationFromLatestTasks = (50s - 10s) / 5 = 8s
137+
// getTaskDuration should return max(10s, 8s) = 10s
138+
java.lang.reflect.Method getTaskDuration = TasksRunner.class.getDeclaredMethod("getTaskDuration");
139+
getTaskDuration.setAccessible(true);
140+
Duration result = (Duration) getTaskDuration.invoke(runner);
141+
142+
assertEquals(Duration.ofSeconds(10), result);
143+
}
144+
145+
@Test
146+
public void testGetTaskDuration_EmptyLastTaskDurations() throws Exception {
147+
OutputHandleFactory mockSinkFactory = mock(OutputHandleFactory.class);
148+
Handle mockHandle = mock(Handle.class);
149+
int threadPoolSize = 2;
150+
TaskSetState.Impl mockState = mock(TaskSetState.Impl.class);
151+
ConnectorArguments arguments = new ConnectorArguments("--connector", "test");
152+
List<Task<?>> tasks = Collections.nCopies(3, mock(Task.class));
153+
TasksRunner runner =
154+
new TasksRunner(mockSinkFactory, mockHandle, threadPoolSize, mockState, tasks, arguments);
155+
156+
// Set numberOfCompletedTasks to 3
157+
java.lang.reflect.Field completedField =
158+
TasksRunner.class.getDeclaredField("numberOfCompletedTasks");
159+
completedField.setAccessible(true);
160+
completedField.set(runner, new java.util.concurrent.atomic.AtomicInteger(3));
161+
162+
// Mock stopwatch to control elapsed time
163+
java.lang.reflect.Field stopwatchField =
164+
TasksRunner.class.getDeclaredField("stopwatch");
165+
stopwatchField.setAccessible(true);
166+
com.google.common.base.Stopwatch mockStopwatch = mock(com.google.common.base.Stopwatch.class);
167+
when(mockStopwatch.elapsed()).thenReturn(Duration.ofSeconds(9));
168+
stopwatchField.set(runner, mockStopwatch);
169+
170+
// Ensure lastTaskDurations is empty
171+
java.lang.reflect.Field durationsField =
172+
TasksRunner.class.getDeclaredField("lastTaskDurations");
173+
durationsField.setAccessible(true);
174+
@SuppressWarnings("unchecked")
175+
Deque<Duration> durations = (Deque<Duration>) durationsField.get(runner);
176+
durations.clear();
177+
178+
// getAverageTaskDurationFromAllTasks = 9s / 3 = 3s
179+
// getAverageTaskDurationFromLatestTasks = 0s
180+
// getTaskDuration should return max(3s, 0s) = 3s
181+
java.lang.reflect.Method getTaskDuration = TasksRunner.class.getDeclaredMethod("getTaskDuration");
182+
getTaskDuration.setAccessible(true);
183+
Duration result = (Duration) getTaskDuration.invoke(runner);
184+
185+
assertEquals(Duration.ofSeconds(3), result);
186+
}
187+
188+
@Test
189+
public void testGetTaskDuration_LastTaskDurationsGreaterThanAllTasks() throws Exception {
190+
OutputHandleFactory mockSinkFactory = mock(OutputHandleFactory.class);
191+
Handle mockHandle = mock(Handle.class);
192+
int threadPoolSize = 2;
193+
TaskSetState.Impl mockState = mock(TaskSetState.Impl.class);
194+
ConnectorArguments arguments = new ConnectorArguments("--connector", "test");
195+
List<Task<?>> tasks = Collections.nCopies(4, mock(Task.class));
196+
TasksRunner runner =
197+
new TasksRunner(mockSinkFactory, mockHandle, threadPoolSize, mockState, tasks, arguments);
198+
199+
// Set numberOfCompletedTasks to 2
200+
java.lang.reflect.Field completedField =
201+
TasksRunner.class.getDeclaredField("numberOfCompletedTasks");
202+
completedField.setAccessible(true);
203+
completedField.set(runner, new java.util.concurrent.atomic.AtomicInteger(2));
204+
205+
// Mock stopwatch to control elapsed time
206+
java.lang.reflect.Field stopwatchField =
207+
TasksRunner.class.getDeclaredField("stopwatch");
208+
stopwatchField.setAccessible(true);
209+
com.google.common.base.Stopwatch mockStopwatch = mock(com.google.common.base.Stopwatch.class);
210+
when(mockStopwatch.elapsed()).thenReturn(Duration.ofSeconds(6));
211+
stopwatchField.set(runner, mockStopwatch);
212+
213+
// Fill lastTaskDurations with 2 durations: 2s, 10s
214+
java.lang.reflect.Field durationsField =
215+
TasksRunner.class.getDeclaredField("lastTaskDurations");
216+
durationsField.setAccessible(true);
217+
@SuppressWarnings("unchecked")
218+
Deque<Duration> durations = (Deque<Duration>) durationsField.get(runner);
219+
durations.clear();
220+
durations.add(Duration.ofSeconds(2));
221+
durations.add(Duration.ofSeconds(10));
222+
223+
// getAverageTaskDurationFromAllTasks = 6s / 2 = 3s
224+
// getAverageTaskDurationFromLatestTasks = (10s - 2s) / 2 = 4s
225+
// getTaskDuration should return max(3s, 4s) = 4s
226+
java.lang.reflect.Method getTaskDuration = TasksRunner.class.getDeclaredMethod("getTaskDuration");
227+
getTaskDuration.setAccessible(true);
228+
Duration result = (Duration) getTaskDuration.invoke(runner);
229+
230+
assertEquals(Duration.ofSeconds(4), result);
231+
}
232+
}

0 commit comments

Comments
 (0)