Skip to content

Commit 3b99021

Browse files
SteNicholaspolyzos
authored andcommitted
[client] RemoteLogDownloader should increment remoteFetchBytesPerSecond of ScannerMetricGroup (apache#858)
1 parent 1084496 commit 3b99021

File tree

3 files changed

+44
-14
lines changed

3 files changed

+44
-14
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/metrics/ScannerMetricGroup.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class ScannerMetricGroup extends AbstractMetricGroup {
4747
private final Histogram bytesPerRequest;
4848

4949
// remote log
50-
private final Counter remoteFetchBytesCount;
50+
private final Counter remoteFetchBytes;
5151
private final Counter remoteFetchRequestCount;
5252
private final Counter remoteFetchErrorCount;
5353

@@ -64,8 +64,8 @@ public ScannerMetricGroup(ClientMetricGroup parent, TablePath tablePath) {
6464
fetchRequestCount = new ThreadSafeSimpleCounter();
6565
meter(MetricNames.SCANNER_FETCH_RATE, new MeterView(fetchRequestCount));
6666

67-
remoteFetchBytesCount = new ThreadSafeSimpleCounter();
68-
meter(MetricNames.SCANNER_REMOTE_FETCH_BYTES_RATE, new MeterView(remoteFetchBytesCount));
67+
remoteFetchBytes = new ThreadSafeSimpleCounter();
68+
meter(MetricNames.SCANNER_REMOTE_FETCH_BYTES_RATE, new MeterView(remoteFetchBytes));
6969
remoteFetchRequestCount = new ThreadSafeSimpleCounter();
7070
meter(MetricNames.SCANNER_REMOTE_FETCH_RATE, new MeterView(remoteFetchRequestCount));
7171
remoteFetchErrorCount = new ThreadSafeSimpleCounter();
@@ -90,8 +90,8 @@ public Histogram bytesPerRequest() {
9090
return bytesPerRequest;
9191
}
9292

93-
public Counter remoteFetchBytesCount() {
94-
return remoteFetchBytesCount;
93+
public Counter remoteFetchBytes() {
94+
return remoteFetchBytes;
9595
}
9696

9797
public Counter remoteFetchRequestCount() {

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloader.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ void fetchOnce() throws Exception {
163163
segmentPath,
164164
new CloseableRegistry());
165165
File localFile = new File(segmentPath.toFile(), fsPathAndFileName.getFileName());
166+
scannerMetricGroup.remoteFetchBytes().inc(localFile.length());
166167
String segmentId = request.segment.remoteLogSegmentId().toString();
167168
fetchedFiles.put(segmentId, segmentPath);
168169
request.future.complete(localFile);
@@ -229,6 +230,18 @@ Path getLocalLogDir() {
229230
return localLogDir;
230231
}
231232

233+
protected static FsPathAndFileName getFsPathAndFileName(
234+
FsPath remoteLogTabletDir, RemoteLogSegment segment) {
235+
FsPath remotePath =
236+
remoteLogSegmentFile(
237+
remoteLogSegmentDir(remoteLogTabletDir, segment.remoteLogSegmentId()),
238+
segment.remoteLogStartOffset());
239+
return new FsPathAndFileName(
240+
remotePath,
241+
FlussPaths.filenamePrefixFromOffset(segment.remoteLogStartOffset())
242+
+ LOG_FILE_SUFFIX);
243+
}
244+
232245
/**
233246
* Thread to download remote log files to local. The thread will keep fetching remote log files
234247
* until it is interrupted.
@@ -257,14 +270,7 @@ public RemoteLogDownloadRequest(RemoteLogSegment segment, FsPath remoteLogTablet
257270
}
258271

259272
public FsPathAndFileName getFsPathAndFileName() {
260-
FsPath remotePath =
261-
remoteLogSegmentFile(
262-
remoteLogSegmentDir(remoteLogTabletDir, segment.remoteLogSegmentId()),
263-
segment.remoteLogStartOffset());
264-
return new FsPathAndFileName(
265-
remotePath,
266-
FlussPaths.filenamePrefixFromOffset(segment.remoteLogStartOffset())
267-
+ LOG_FILE_SUFFIX);
273+
return RemoteLogDownloader.getFsPathAndFileName(remoteLogTabletDir, segment);
268274
}
269275
}
270276
}

fluss-client/src/test/java/com/alibaba/fluss/client/table/scanner/log/RemoteLogDownloaderTest.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.alibaba.fluss.client.table.scanner.log;
1818

19+
import com.alibaba.fluss.client.metrics.ScannerMetricGroup;
1920
import com.alibaba.fluss.client.metrics.TestingScannerMetricGroup;
2021
import com.alibaba.fluss.client.table.scanner.RemoteFileDownloader;
2122
import com.alibaba.fluss.config.ConfigOptions;
@@ -56,6 +57,7 @@ class RemoteLogDownloaderTest {
5657
private FsPath remoteLogDir;
5758
private Configuration conf;
5859
private RemoteFileDownloader remoteFileDownloader;
60+
private ScannerMetricGroup scannerMetricGroup;
5961
private RemoteLogDownloader remoteLogDownloader;
6062

6163
@BeforeEach
@@ -66,12 +68,13 @@ void beforeEach() {
6668
conf.set(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM, 4);
6769
remoteLogDir = remoteLogDir(conf);
6870
remoteFileDownloader = new RemoteFileDownloader(1);
71+
scannerMetricGroup = TestingScannerMetricGroup.newInstance();
6972
remoteLogDownloader =
7073
new RemoteLogDownloader(
7174
DATA1_TABLE_PATH,
7275
conf,
7376
remoteFileDownloader,
74-
TestingScannerMetricGroup.newInstance(),
77+
scannerMetricGroup,
7578
// use a short timout for faster testing
7679
10L);
7780
}
@@ -106,12 +109,18 @@ void testPrefetchNum() throws Exception {
106109
});
107110

108111
assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(4);
112+
assertThat(scannerMetricGroup.remoteFetchRequestCount().getCount()).isEqualTo(4);
113+
assertThat(scannerMetricGroup.remoteFetchBytes().getCount())
114+
.isEqualTo(remoteLogSegmentFilesLength(remoteLogSegments, remoteLogTabletDir, 4));
109115
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(0);
110116

111117
futures.get(0).getRecycleCallback().run();
112118
// the 5th segment should success.
113119
retry(Duration.ofMinutes(1), () -> assertThat(futures.get(4).isDone()).isTrue());
114120
assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(4);
121+
assertThat(scannerMetricGroup.remoteFetchRequestCount().getCount()).isEqualTo(5);
122+
assertThat(scannerMetricGroup.remoteFetchBytes().getCount())
123+
.isEqualTo(remoteLogSegmentFilesLength(remoteLogSegments, remoteLogTabletDir, 5));
115124
assertThat(remoteLogDownloader.getPrefetchSemaphore().availablePermits()).isEqualTo(0);
116125

117126
futures.get(1).getRecycleCallback().run();
@@ -163,4 +172,19 @@ private static List<RemoteLogSegment> buildRemoteLogSegmentList(
163172
}
164173
return remoteLogSegmentList;
165174
}
175+
176+
private static Long remoteLogSegmentFilesLength(
177+
List<RemoteLogSegment> remoteLogSegments, FsPath remoteLogTabletDir, int segmentNum) {
178+
return remoteLogSegments.stream()
179+
.limit(segmentNum)
180+
.mapToLong(
181+
segment ->
182+
new File(
183+
RemoteLogDownloader.getFsPathAndFileName(
184+
remoteLogTabletDir, segment)
185+
.getPath()
186+
.getPath())
187+
.length())
188+
.sum();
189+
}
166190
}

0 commit comments

Comments
 (0)