Skip to content

Commit 2fa0d0a

Browse files
[OPIK-2597] [BE] Separate thread related functionality to a dedicated service and DAO (#4465)
* Separate thread related functionality to a dedicated service and DAO * fix DI
1 parent f22cebd commit 2fa0d0a

File tree

9 files changed

+1605
-1503
lines changed

9 files changed

+1605
-1503
lines changed

apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/TracesResource.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.comet.opik.domain.FeedbackScoreService;
3838
import com.comet.opik.domain.ProjectService;
3939
import com.comet.opik.domain.Streamer;
40+
import com.comet.opik.domain.ThreadService;
4041
import com.comet.opik.domain.TraceSearchCriteria;
4142
import com.comet.opik.domain.TraceService;
4243
import com.comet.opik.domain.threads.TraceThreadService;
@@ -105,6 +106,7 @@
105106
public class TracesResource {
106107

107108
private final @NonNull TraceService service;
109+
private final @NonNull ThreadService threadService;
108110
private final @NonNull FeedbackScoreService feedbackScoreService;
109111
private final @NonNull CommentService commentService;
110112
private final @NonNull FiltersFactory filtersFactory;
@@ -639,7 +641,7 @@ public Response getTraceThreads(
639641

640642
log.info("Get trace threads by '{}' on workspaceId '{}'", searchCriteria, workspaceId);
641643

642-
TraceThreadPage traceThreadPage = service.getTraceThreads(page, size, searchCriteria)
644+
TraceThreadPage traceThreadPage = threadService.find(page, size, searchCriteria)
643645
.map(it -> {
644646
// Remove sortableBy fields if dynamic sorting is disabled due to workspace size
645647
if (metadata.cannotUseDynamicSorting()) {
@@ -690,7 +692,7 @@ public ChunkedOutput<JsonNode> searchTraceThreads(
690692
.uuidToTime(instantToUUIDMapper.toUpperBound(request.toTime()))
691693
.build();
692694

693-
Flux<TraceThread> items = service.threadsSearch(request.limit(), searchCriteria)
695+
Flux<TraceThread> items = threadService.search(request.limit(), searchCriteria)
694696
.contextWrite(ctx -> ctx.put(RequestContext.WORKSPACE_ID, workspaceId)
695697
.put(RequestContext.USER_NAME, userName)
696698
.put(RequestContext.VISIBILITY, Optional.ofNullable(visibility).orElse(Visibility.PRIVATE)));
@@ -716,7 +718,7 @@ public Response getTraceThread(
716718
log.info("Getting trace thread by id '{}' and project id '{}' on workspace_id '{}' with truncate '{}'",
717719
identifier.threadId(), projectId, workspaceId, identifier.truncate());
718720

719-
TraceThread thread = service.getThreadById(projectId, identifier.threadId(), identifier.truncate())
721+
TraceThread thread = threadService.getById(projectId, identifier.threadId(), identifier.truncate())
720722
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
721723
.block();
722724

@@ -877,7 +879,7 @@ public Response getThreadStats(@QueryParam("project_id") UUID projectId,
877879

878880
log.info("Get trace thread stats by '{}' on workspaceId '{}'", searchCriteria, workspaceId);
879881

880-
ProjectStats projectStats = service.getThreadStats(searchCriteria)
882+
ProjectStats projectStats = threadService.getStats(searchCriteria)
881883
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
882884
.block();
883885

apps/opik-backend/src/main/java/com/comet/opik/domain/ThreadDAO.java

Lines changed: 1385 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.comet.opik.domain;
2+
3+
import com.comet.opik.api.ProjectStats;
4+
import com.comet.opik.api.TraceThread;
5+
import com.comet.opik.api.sorting.TraceThreadSortingFactory;
6+
import com.google.inject.ImplementedBy;
7+
import io.opentelemetry.instrumentation.annotations.WithSpan;
8+
import jakarta.inject.Inject;
9+
import jakarta.inject.Singleton;
10+
import lombok.NonNull;
11+
import lombok.RequiredArgsConstructor;
12+
import lombok.extern.slf4j.Slf4j;
13+
import reactor.core.publisher.Flux;
14+
import reactor.core.publisher.Mono;
15+
16+
import java.util.UUID;
17+
18+
import static com.comet.opik.utils.ErrorUtils.failWithNotFound;
19+
20+
@ImplementedBy(ThreadServiceImpl.class)
21+
public interface ThreadService {
22+
23+
Mono<TraceThread.TraceThreadPage> find(int page, int size, TraceSearchCriteria criteria);
24+
25+
Mono<TraceThread> getById(UUID projectId, String threadId, boolean truncate);
26+
27+
Flux<TraceThread> search(int limit, @NonNull TraceSearchCriteria criteria);
28+
29+
Mono<ProjectStats> getStats(TraceSearchCriteria searchCriteria);
30+
}
31+
32+
@Slf4j
33+
@Singleton
34+
@RequiredArgsConstructor(onConstructor_ = @Inject)
35+
class ThreadServiceImpl implements ThreadService {
36+
37+
private final @NonNull ThreadDAO dao;
38+
private final @NonNull ProjectService projectService;
39+
private final @NonNull TraceThreadSortingFactory traceThreadSortingFactory;
40+
41+
@Override
42+
public Mono<TraceThread.TraceThreadPage> find(int page, int size, @NonNull TraceSearchCriteria criteria) {
43+
return findProjectAndVerifyVisibility(criteria)
44+
.flatMap(it -> dao.find(size, page, it))
45+
.switchIfEmpty(Mono
46+
.just(TraceThread.TraceThreadPage.empty(page, traceThreadSortingFactory.getSortableFields())));
47+
}
48+
49+
@Override
50+
public Mono<TraceThread> getById(@NonNull UUID projectId, @NonNull String threadId, boolean truncate) {
51+
return dao.findById(projectId, threadId, truncate)
52+
.switchIfEmpty(Mono.defer(() -> Mono.error(failWithNotFound("Trace Thread", threadId))));
53+
}
54+
55+
@Override
56+
public Flux<TraceThread> search(int limit, @NonNull TraceSearchCriteria criteria) {
57+
return findProjectAndVerifyVisibility(criteria)
58+
.flatMapMany(it -> dao.search(limit, it));
59+
}
60+
61+
@Override
62+
@WithSpan
63+
public Mono<ProjectStats> getStats(@NonNull TraceSearchCriteria criteria) {
64+
return findProjectAndVerifyVisibility(criteria)
65+
.flatMap(dao::getThreadStats)
66+
.switchIfEmpty(Mono.just(ProjectStats.empty()));
67+
}
68+
69+
private Mono<TraceSearchCriteria> findProjectAndVerifyVisibility(TraceSearchCriteria criteria) {
70+
return projectService.resolveProjectIdAndVerifyVisibility(criteria.projectId(), criteria.projectName())
71+
.map(projectId -> criteria.toBuilder()
72+
.projectId(projectId)
73+
.build());
74+
}
75+
}

0 commit comments

Comments
 (0)