diff --git a/common/build.gradle b/common/build.gradle index ad3c70b00..41505e6d5 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -22,5 +22,8 @@ if (tasks.findByName('trimShadedJar')) { keep "class com.linecorp.centraldogma.internal.shaded.caffeine.** { *; }" // Prevent ProGuard from removing all enum values from Option because otherwise it becomes a non-enum class. keep "class com.linecorp.centraldogma.internal.shaded.jsonpath.Option { *; }" + + // Reduces the verbosity of ProGuardTask when running in parallel. + dontnote } } diff --git a/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTaskTest.java b/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTaskTest.java index 4b9381365..c3ffdc930 100644 --- a/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTaskTest.java +++ b/server-mirror-git/src/test/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTaskTest.java @@ -45,10 +45,10 @@ void testSuccessMetrics() { Mirror mirror = newMirror("git://a.com/b.git", GitMirror.class, "foo", "bar"); mirror = spy(mirror); doNothing().when(mirror).mirror(any(), any(), anyInt(), anyLong()); - new MirroringTask(mirror, meterRegistry).run(null, null, 0, 0L); + new MirroringTask(mirror, "foo", meterRegistry).run(null, null, 0, 0L); assertThat(MoreMeters.measureAll(meterRegistry)) .contains(entry("mirroring.result#count{direction=LOCAL_TO_REMOTE,localPath=/," + - "localRepo=bar,remoteBranch=,remotePath=/,success=true}", 1.0)); + "localRepo=bar,project=foo,remoteBranch=,remotePath=/,success=true}", 1.0)); } @Test @@ -58,12 +58,12 @@ void testFailureMetrics() { mirror = spy(mirror); final RuntimeException e = new RuntimeException(); doThrow(e).when(mirror).mirror(any(), any(), anyInt(), anyLong()); - final MirroringTask task = new MirroringTask(mirror, meterRegistry); + final MirroringTask task = new MirroringTask(mirror, "foo", meterRegistry); assertThatThrownBy(() -> task.run(null, null, 0, 0L)) .isSameAs(e); assertThat(MoreMeters.measureAll(meterRegistry)) .contains(entry("mirroring.result#count{direction=LOCAL_TO_REMOTE,localPath=/," + - "localRepo=bar,remoteBranch=main,remotePath=/," + + "localRepo=bar,project=foo,remoteBranch=main,remotePath=/," + "success=false}", 1.0)); } @@ -76,11 +76,11 @@ void testTimerMetrics() { Thread.sleep(1000); return null; }).when(mirror).mirror(any(), any(), anyInt(), anyLong()); - new MirroringTask(mirror, meterRegistry).run(null, null, 0, 0L); + new MirroringTask(mirror, "foo", meterRegistry).run(null, null, 0, 0L); assertThat(MoreMeters.measureAll(meterRegistry)) .hasEntrySatisfying( "mirroring.task#total{direction=LOCAL_TO_REMOTE,localPath=/," + - "localRepo=bar,remoteBranch=,remotePath=/}", + "localRepo=bar,project=foo,remoteBranch=,remotePath=/}", v -> assertThat(v).isCloseTo(1, withPercentage(30))); } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/api/ContentServiceV1.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/api/ContentServiceV1.java index 757194d00..3e8c04fac 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/api/ContentServiceV1.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/api/ContentServiceV1.java @@ -24,6 +24,7 @@ import static com.linecorp.centraldogma.internal.Util.isValidFilePath; import static com.linecorp.centraldogma.server.internal.api.DtoConverter.convert; import static com.linecorp.centraldogma.server.internal.api.HttpApiUtil.returnOrThrow; +import static com.linecorp.centraldogma.server.internal.api.RepositoryServiceV1.increaseCounterIfOldRevisionUsed; import static com.linecorp.centraldogma.server.internal.storage.repository.DefaultMetaRepository.metaRepoFiles; import static java.util.Objects.requireNonNull; @@ -114,11 +115,13 @@ public ContentServiceV1(ProjectManager projectManager, CommandExecutor executor, *

Returns the list of files in the path. */ @Get("regex:/projects/(?[^/]+)/repos/(?[^/]+)/list(?(|/.*))$") - public CompletableFuture>> listFiles(@Param String path, + public CompletableFuture>> listFiles(ServiceRequestContext ctx, + @Param String path, @Param @Default("-1") String revision, Repository repository) { final String normalizedPath = normalizePath(path); final Revision normalizedRev = repository.normalizeNow(new Revision(revision)); + increaseCounterIfOldRevisionUsed(ctx, repository, normalizedRev); final CompletableFuture>> future = new CompletableFuture<>(); listFiles(repository, normalizedPath, normalizedRev, false, future); return future; @@ -214,12 +217,14 @@ private CompletableFuture push(long commitTimeMills, Author author, Re */ @Post("/projects/{projectName}/repos/{repoName}/preview") public CompletableFuture>> preview( + ServiceRequestContext ctx, @Param @Default("-1") String revision, Repository repository, @RequestConverter(ChangesRequestConverter.class) Iterable> changes) { - + final Revision baseRevision = new Revision(revision); + increaseCounterIfOldRevisionUsed(ctx, repository, baseRevision); final CompletableFuture>> changesFuture = - repository.previewDiff(new Revision(revision), changes); + repository.previewDiff(baseRevision, changes); return changesFuture.thenApply(previewDiffs -> previewDiffs.values().stream() .map(DtoConverter::convert) @@ -245,6 +250,7 @@ public CompletableFuture getFiles( Repository repository, @RequestConverter(WatchRequestConverter.class) @Nullable WatchRequest watchRequest, @RequestConverter(QueryRequestConverter.class) @Nullable Query query) { + increaseCounterIfOldRevisionUsed(ctx, repository, new Revision(revision)); final String normalizedPath = normalizePath(path); // watch repository or a file @@ -325,7 +331,8 @@ private static Object handleWatchFailure(Throwable thrown) { * specify {@code to}, this will return the list of commits. */ @Get("regex:/projects/(?[^/]+)/repos/(?[^/]+)/commits(?(|/.*))$") - public CompletableFuture listCommits(@Param String revision, + public CompletableFuture listCommits(ServiceRequestContext ctx, + @Param String revision, @Param @Default("/**") String path, @Param @Nullable String to, @Param @Nullable Integer maxCommits, @@ -346,6 +353,10 @@ public CompletableFuture listCommits(@Param String revision, } final RevisionRange range = repository.normalizeNow(fromRevision, toRevision).toDescending(); + + increaseCounterIfOldRevisionUsed(ctx, repository, range.from()); + increaseCounterIfOldRevisionUsed(ctx, repository, range.to()); + final int maxCommits0 = firstNonNull(maxCommits, Repository.DEFAULT_MAX_COMMITS); return repository .history(range.from(), range.to(), normalizePath(path), maxCommits0) @@ -368,17 +379,21 @@ public CompletableFuture listCommits(@Param String revision, */ @Get("/projects/{projectName}/repos/{repoName}/compare") public CompletableFuture getDiff( + ServiceRequestContext ctx, @Param @Default("/**") String pathPattern, @Param @Default("1") String from, @Param @Default("head") String to, Repository repository, @RequestConverter(QueryRequestConverter.class) @Nullable Query query) { - + final Revision fromRevision = new Revision(from); + final Revision toRevision = new Revision(to); + increaseCounterIfOldRevisionUsed(ctx, repository, fromRevision); + increaseCounterIfOldRevisionUsed(ctx, repository, toRevision); if (query != null) { - return repository.diff(new Revision(from), new Revision(to), query) + return repository.diff(fromRevision, toRevision, query) .thenApply(DtoConverter::convert); } else { return repository - .diff(new Revision(from), new Revision(to), normalizePath(pathPattern)) + .diff(fromRevision, toRevision, normalizePath(pathPattern)) .thenApply(changeMap -> changeMap.values().stream() .map(DtoConverter::convert).collect(toImmutableList())); } @@ -402,9 +417,12 @@ private static Object objectOrList(Collection collection, boolean toList, */ @Get("/projects/{projectName}/repos/{repoName}/merge") public CompletableFuture> mergeFiles( + ServiceRequestContext ctx, @Param @Default("-1") String revision, Repository repository, @RequestConverter(MergeQueryRequestConverter.class) MergeQuery query) { - return repository.mergeFiles(new Revision(revision), query).thenApply(DtoConverter::convert); + final Revision rev = new Revision(revision); + increaseCounterIfOldRevisionUsed(ctx, repository, rev); + return repository.mergeFiles(rev, query).thenApply(DtoConverter::convert); } /** diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/api/RepositoryServiceV1.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/api/RepositoryServiceV1.java index 655d7ea99..60359ef94 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/api/RepositoryServiceV1.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/api/RepositoryServiceV1.java @@ -16,6 +16,7 @@ package com.linecorp.centraldogma.server.internal.api; +import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.linecorp.centraldogma.server.internal.api.HttpApiUtil.checkUnremoveArgument; import static com.linecorp.centraldogma.server.internal.api.HttpApiUtil.returnOrThrow; @@ -28,9 +29,11 @@ import javax.annotation.Nullable; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.logging.RequestOnlyLog; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.annotation.Consumes; import com.linecorp.armeria.server.annotation.Delete; @@ -58,6 +61,8 @@ import com.linecorp.centraldogma.server.storage.project.ProjectManager; import com.linecorp.centraldogma.server.storage.repository.Repository; +import io.micrometer.core.instrument.Tag; + /** * Annotated service object for managing repositories. */ @@ -186,8 +191,48 @@ public CompletableFuture patchRepository(@Param String repoName, */ @Get("/projects/{projectName}/repos/{repoName}/revision/{revision}") @RequiresReadPermission - public Map normalizeRevision(Repository repository, @Param String revision) { + public Map normalizeRevision(ServiceRequestContext ctx, + Repository repository, @Param String revision) { final Revision normalizedRevision = repository.normalizeNow(new Revision(revision)); + final Revision head = repository.normalizeNow(Revision.HEAD); + increaseCounterIfOldRevisionUsed(ctx, repository, normalizedRevision, head); return ImmutableMap.of("revision", normalizedRevision.major()); } + + static void increaseCounterIfOldRevisionUsed(ServiceRequestContext ctx, Repository repository, + Revision revision) { + final Revision normalized = repository.normalizeNow(revision); + final Revision head = repository.normalizeNow(Revision.HEAD); + increaseCounterIfOldRevisionUsed(ctx, repository, normalized, head); + } + + public static void increaseCounterIfOldRevisionUsed( + ServiceRequestContext ctx, Repository repository, Revision normalized, Revision head) { + final String projectName = repository.parent().name(); + final String repoName = repository.name(); + if (normalized.major() == 1) { + ctx.log().whenRequestComplete().thenAccept( + log -> ctx.meterRegistry() + .counter("revisions.init", generateTags(projectName, repoName, log).build()) + .increment()); + } + if (head.major() - normalized.major() >= 5000) { + ctx.log().whenRequestComplete().thenAccept( + log -> ctx.meterRegistry() + .summary("revisions.old", + generateTags(projectName, repoName, log) + .add(Tag.of("init", Boolean.toString(normalized.major() == 1))) + .build()) + .record(head.major() - normalized.major())); + } + } + + private static ImmutableList.Builder generateTags( + String projectName, String repoName, RequestOnlyLog log) { + final ImmutableList.Builder builder = ImmutableList.builder(); + return builder.add(Tag.of("project", projectName), + Tag.of("repo", repoName), + Tag.of("service", firstNonNull(log.serviceName(), "none")), + Tag.of("method", log.name())); + } } diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/DefaultMirroringService.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/DefaultMirroringService.java index 89e37756c..fefb7f579 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/DefaultMirroringService.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/DefaultMirroringService.java @@ -22,6 +22,7 @@ import java.io.File; import java.time.Duration; import java.time.ZonedDateTime; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -29,7 +30,6 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; import javax.annotation.Nullable; @@ -188,35 +188,25 @@ private void schedulePendingMirrors() { final ZonedDateTime currentLastExecutionTime = lastExecutionTime; lastExecutionTime = now; - projectManager.list().values().stream() - .map(Project::metaRepo) - .flatMap(r -> { + projectManager.list() + .values() + .forEach(project -> { + final Set mirrors; try { - return r.mirrors().stream(); + mirrors = project.metaRepo().mirrors(); } catch (Exception e) { - logger.warn("Failed to load the mirror list from: {}", r.parent().name(), e); - return Stream.empty(); + logger.warn("Failed to load the mirror list from: {}", project.name(), e); + return; } - }) - .filter(m -> { - try { - return m.nextExecutionTime(currentLastExecutionTime).compareTo(now) < 0; - } catch (Exception e) { - logger.warn("Failed to calculate the next execution time of: {}", m, e); - return false; - } - }) - .forEach(m -> { - final ListenableFuture future = worker.submit(() -> run(m, true)); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable Object result) {} - - @Override - public void onFailure(Throwable cause) { - logger.warn("Unexpected Git mirroring failure: {}", m, cause); + mirrors.forEach(m -> { + try { + if (m.nextExecutionTime(currentLastExecutionTime).compareTo(now) < 0) { + run(project, m); + } + } catch (Exception e) { + logger.warn("Unexpected exception while mirroring: {}", m, e); } - }, MoreExecutors.directExecutor()); + }); }); } @@ -229,14 +219,27 @@ public CompletableFuture mirror() { return CompletableFuture.runAsync( () -> projectManager.list().values() .forEach(p -> p.metaRepo().mirrors() - .forEach(m -> run(m, false))), + .forEach(m -> run(m, p.name(), false))), worker); } - private void run(Mirror m, boolean logOnFailure) { + private void run(Project project, Mirror m) { + final ListenableFuture future = worker.submit(() -> run(m, project.name(), true)); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Object result) {} + + @Override + public void onFailure(Throwable cause) { + logger.warn("Unexpected Git mirroring failure: {}", m, cause); + } + }, MoreExecutors.directExecutor()); + } + + private void run(Mirror m, String projectName, boolean logOnFailure) { logger.info("Mirroring: {}", m); try { - new MirroringTask(m, meterRegistry) + new MirroringTask(m, projectName, meterRegistry) .run(workDir, commandExecutor, maxNumFilesPerMirror, maxNumBytesPerMirror); } catch (Exception e) { if (logOnFailure) { diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTask.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTask.java index 14fd9ad57..1342599b0 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTask.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/mirror/MirroringTask.java @@ -31,8 +31,9 @@ final class MirroringTask { - private static Iterable generateTags(Mirror mirror) { + private static Iterable generateTags(Mirror mirror, String projectName) { return ImmutableList.of( + Tag.of("project", projectName), Tag.of("direction", mirror.direction().name()), Tag.of("remoteBranch", firstNonNull(mirror.remoteBranch(), "")), Tag.of("remotePath", mirror.remotePath()), @@ -44,10 +45,10 @@ private static Iterable generateTags(Mirror mirror) { private final Mirror mirror; private final Iterable tags; - MirroringTask(Mirror mirror, MeterRegistry meterRegistry) { + MirroringTask(Mirror mirror, String projectName, MeterRegistry meterRegistry) { this.mirror = mirror; this.meterRegistry = meterRegistry; - tags = generateTags(mirror); + tags = generateTags(mirror, projectName); } private Counter counter(boolean success) { diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/thrift/CentralDogmaServiceImpl.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/thrift/CentralDogmaServiceImpl.java index 8265629c0..681d5f460 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/thrift/CentralDogmaServiceImpl.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/thrift/CentralDogmaServiceImpl.java @@ -17,7 +17,9 @@ import static com.google.common.base.MoreObjects.firstNonNull; import static com.linecorp.centraldogma.common.Author.SYSTEM; +import static com.linecorp.centraldogma.common.Revision.HEAD; import static com.linecorp.centraldogma.server.internal.api.ContentServiceV1.checkPush; +import static com.linecorp.centraldogma.server.internal.api.RepositoryServiceV1.increaseCounterIfOldRevisionUsed; import static com.linecorp.centraldogma.server.internal.thrift.Converter.convert; import static com.linecorp.centraldogma.server.storage.project.Project.isReservedRepoName; import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ALL_WITHOUT_CONTENT; @@ -36,6 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.centraldogma.internal.thrift.Author; import com.linecorp.centraldogma.internal.thrift.CentralDogmaConstants; @@ -218,15 +221,26 @@ public void normalizeRevision(String projectName, String repositoryName, Revisio AsyncMethodCallback resultHandler) { final com.linecorp.centraldogma.common.Revision normalized = - projectManager.get(projectName).repos().get(repositoryName) - .normalizeNow(convert(revision)); + normalizeRevision(projectName, repositoryName, revision); + resultHandler.onComplete(convert(normalized)); } + private com.linecorp.centraldogma.common.Revision normalizeRevision( + String projectName, String repositoryName, Revision revision) { + final Repository repository = projectManager.get(projectName).repos().get(repositoryName); + final com.linecorp.centraldogma.common.Revision normalized = + repository.normalizeNow(convert(revision)); + final com.linecorp.centraldogma.common.Revision head = repository.normalizeNow(HEAD); + increaseCounterIfOldRevisionUsed(RequestContext.current(), repository, normalized, head); + return normalized; + } + @Override public void listFiles(String projectName, String repositoryName, Revision revision, String pathPattern, AsyncMethodCallback resultHandler) { - + // Call normalizeRevision() first to check if the specified revision needs to be recorded. + normalizeRevision(projectName, repositoryName, revision); handle(projectManager.get(projectName).repos().get(repositoryName) .find(convert(revision), pathPattern, FIND_ALL_WITHOUT_CONTENT) .thenApply(entries -> { @@ -241,7 +255,8 @@ public void listFiles(String projectName, String repositoryName, Revision revisi @Override public void getFiles(String projectName, String repositoryName, Revision revision, String pathPattern, AsyncMethodCallback resultHandler) { - + // Call normalizeRevision() first to check if the specified revision needs to be recorded. + normalizeRevision(projectName, repositoryName, revision); handle(projectManager.get(projectName).repos().get(repositoryName) .find(convert(revision), pathPattern) .thenApply(entries -> { @@ -257,7 +272,9 @@ public void getFiles(String projectName, String repositoryName, Revision revisio @Override public void getHistory(String projectName, String repositoryName, Revision from, Revision to, String pathPattern, AsyncMethodCallback resultHandler) { - + // Call normalizeRevision() first to check if the specified revision needs to be recorded. + normalizeRevision(projectName, repositoryName, from); + normalizeRevision(projectName, repositoryName, to); handle(projectManager.get(projectName).repos().get(repositoryName) .history(convert(from), convert(to), pathPattern) .thenApply(commits -> commits.stream() @@ -269,7 +286,9 @@ public void getHistory(String projectName, String repositoryName, Revision from, @Override public void getDiffs(String projectName, String repositoryName, Revision from, Revision to, String pathPattern, AsyncMethodCallback resultHandler) { - + // Call normalizeRevision() first to check if the specified revision needs to be recorded. + normalizeRevision(projectName, repositoryName, from); + normalizeRevision(projectName, repositoryName, to); handle(projectManager.get(projectName).repos().get(repositoryName) .diff(convert(from), convert(to), pathPattern) .thenApply(diffs -> convert(diffs.values(), Converter::convert)), @@ -279,7 +298,8 @@ public void getDiffs(String projectName, String repositoryName, Revision from, R @Override public void getPreviewDiffs(String projectName, String repositoryName, Revision baseRevision, List changes, AsyncMethodCallback resultHandler) { - + // Call normalizeRevision() first to check if the specified revision needs to be recorded. + normalizeRevision(projectName, repositoryName, baseRevision); handle(projectManager.get(projectName).repos().get(repositoryName) .previewDiff(convert(baseRevision), convert(changes, Converter::convert)) .thenApply(diffs -> convert(diffs.values(), Converter::convert)), @@ -313,7 +333,8 @@ public void push(String projectName, String repositoryName, Revision baseRevisio @Override public void getFile(String projectName, String repositoryName, Revision revision, Query query, AsyncMethodCallback resultHandler) { - + // Call normalizeRevision() first to check if the specified revision needs to be recorded. + normalizeRevision(projectName, repositoryName, revision); handle(projectManager.get(projectName).repos().get(repositoryName) .get(convert(revision), convert(query)) .thenApply(res -> new GetFileResult(convert(res.type()), res.contentAsText())), @@ -323,7 +344,9 @@ public void getFile(String projectName, String repositoryName, Revision revision @Override public void diffFile(String projectName, String repositoryName, Revision from, Revision to, Query query, AsyncMethodCallback resultHandler) { - + // Call normalizeRevision() first to check if the specified revision needs to be recorded. + normalizeRevision(projectName, repositoryName, from); + normalizeRevision(projectName, repositoryName, to); // FIXME(trustin): Remove the firstNonNull() on the change content once we make it optional. handle(projectManager.get(projectName).repos().get(repositoryName) .diff(convert(from), convert(to), convert(query)) @@ -335,6 +358,8 @@ public void diffFile(String projectName, String repositoryName, Revision from, R @Override public void mergeFiles(String projectName, String repositoryName, Revision revision, MergeQuery mergeQuery, AsyncMethodCallback resultHandler) { + // Call normalizeRevision() first to check if the specified revision needs to be recorded. + normalizeRevision(projectName, repositoryName, revision); handle(projectManager.get(projectName).repos().get(repositoryName) .mergeFiles(convert(revision), convert(mergeQuery)) .thenApply(merged -> new MergedEntry(convert(merged.revision()), @@ -348,7 +373,8 @@ public void mergeFiles(String projectName, String repositoryName, Revision revis public void watchRepository( String projectName, String repositoryName, Revision lastKnownRevision, String pathPattern, long timeoutMillis, AsyncMethodCallback resultHandler) { - + // Call normalizeRevision() first to check if the specified revision needs to be recorded. + normalizeRevision(projectName, repositoryName, lastKnownRevision); if (timeoutMillis <= 0) { rejectInvalidWatchTimeout("watchRepository", resultHandler); return; @@ -384,7 +410,8 @@ private static void handleWatchRepositoryResult( public void watchFile( String projectName, String repositoryName, Revision lastKnownRevision, Query query, long timeoutMillis, AsyncMethodCallback resultHandler) { - + // Call normalizeRevision() first to check if the specified revision needs to be recorded. + normalizeRevision(projectName, repositoryName, lastKnownRevision); if (timeoutMillis <= 0) { rejectInvalidWatchTimeout("watchFile", resultHandler); return; diff --git a/server/src/test/java/com/linecorp/centraldogma/server/MetricsTest.java b/server/src/test/java/com/linecorp/centraldogma/server/MetricsTest.java index 1d98fefd0..8a7447289 100644 --- a/server/src/test/java/com/linecorp/centraldogma/server/MetricsTest.java +++ b/server/src/test/java/com/linecorp/centraldogma/server/MetricsTest.java @@ -18,14 +18,21 @@ import static org.assertj.core.api.Assertions.assertThat; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; +import com.fasterxml.jackson.databind.JsonNode; + import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; import com.linecorp.centraldogma.client.CentralDogma; +import com.linecorp.centraldogma.client.CentralDogmaRepository; +import com.linecorp.centraldogma.client.WatchRequest; import com.linecorp.centraldogma.common.Change; import com.linecorp.centraldogma.common.Query; +import com.linecorp.centraldogma.common.Revision; +import com.linecorp.centraldogma.testing.internal.FlakyTest; import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; import io.micrometer.core.instrument.MeterRegistry; @@ -33,6 +40,8 @@ import io.micrometer.prometheus.PrometheusMeterRegistry; import io.prometheus.client.exporter.common.TextFormat; +@FlakyTest +@Timeout(30) class MetricsTest { @RegisterExtension @@ -55,7 +64,7 @@ void metrics() { assertThat(((CompositeMeterRegistry) meterRegistry).getRegistries()) .hasAtLeastOneElementOfType(PrometheusMeterRegistry.class); - AggregatedHttpResponse res = dogma.httpClient().get("/monitor/metrics").aggregate().join(); + final AggregatedHttpResponse res = dogma.httpClient().get("/monitor/metrics").aggregate().join(); String content = res.contentUtf8(); assertThat(res.status()).isEqualTo(HttpStatus.OK); assertThat(res.contentType()).isEqualTo(MediaType.parse(TextFormat.CONTENT_TYPE_004)); @@ -63,15 +72,36 @@ void metrics() { assertThat(content).doesNotContain( "com.linecorp.centraldogma.server.internal.api.WatchContentServiceV1"); - dogma.client() - .forRepo("foo", "bar") - .watch(Query.ofJson("/foo.json")) - .timeoutMillis(100) - .errorOnEntryNotFound(false) - .start() - .join(); - res = dogma.httpClient().get("/monitor/metrics").aggregate().join(); - content = res.contentUtf8(); + final WatchRequest jsonNodeWatchRequest = dogma.client() + .forRepo("foo", "bar") + .watch(Query.ofJson("/foo.json")) + .timeoutMillis(100) + .errorOnEntryNotFound(false); + jsonNodeWatchRequest.start().join(); + content = dogma.httpClient().get("/monitor/metrics").aggregate().join().contentUtf8(); assertThat(content).contains("com.linecorp.centraldogma.server.internal.api.WatchContentServiceV1"); + assertThat(content).doesNotContain("revisions_init"); + + // Trigger old revision recording + jsonNodeWatchRequest.start(Revision.INIT).join(); + content = dogma.httpClient().get("/monitor/metrics").aggregate().join().contentUtf8(); + assertThat(content).contains("revisions_init"); + assertThat(content).doesNotContain("revisions_old"); + + final CentralDogmaRepository centralDogmaRepo = dogma.client().forRepo("foo", "bar"); + for (int i = 0; i < 5000; i++) { + centralDogmaRepo.commit("Add a commit", Change.ofTextUpsert("/foo.txt", Integer.toString(i))) + .push().join(); + } + + jsonNodeWatchRequest.start(Revision.INIT).join(); + content = dogma.httpClient().get("/monitor/metrics").aggregate().join().contentUtf8(); + assertThat(content).contains("revisions_old"); + assertThat(content).contains("init=\"true\""); + assertThat(content).doesNotContain("init=\"false\""); + + jsonNodeWatchRequest.start(new Revision(2)).join(); + content = dogma.httpClient().get("/monitor/metrics").aggregate().join().contentUtf8(); + assertThat(content).contains("init=\"false\""); } }