Skip to content

Commit 810bfb5

Browse files
authored
[OPIK-2770] [BE] Add optimization log sync service with Redis to S3 streaming (#4539)
* [OPIK-2770] [BE] Add optimization log sync service with Redis to S3 streaming - Add OptimizationLogSyncService for syncing logs from Redis to S3 - Use Flux.interval() with Managed lifecycle (no Quartz dependency) - Implement per-optimization distributed locking to prevent duplicates - Gzip compress logs before S3 upload - S3 key pattern: logs/optimization-studio/{workspace_id}/{id}.log.gz - Use KeysScanOptions for Redis key scanning (non-deprecated API) - Add syncConcurrency config for parallel S3 uploads - Trigger final sync on optimization completion via finalizeLogsOnCompletion - Reduce Redis TTL to 1 hour after completion for cleanup Demo: https://grid-comet-ml.enterprise.slack.com/files/U06VAFE2L8J/F0A54BNP34M/screenshare_-_2025-12-23_10_49_10_am.mp4 * Fix test to use getLockTimeout() instead of getLockTtlSeconds() * Fix: Only preserve name on upsert if incoming name is blank The SDK sends blank names during status updates, but explicit name updates via upsert should be honored. * Address Baz review feedback - Fix: Update last_flush_ts in doSyncToS3AndReduceTTL to prevent repeated uploads - Fix: Log full exception instead of just message in finalizeLogsAsync - Fix: Log full exception instead of just message in OptimizationLogFlusherJob - Remove unused deleteRedisKeys method - Test: Add verification for last_flush_ts update in finalization test * Revision 3: Address Ido's review - DRY refactor and shared META_KEY_PATTERN
1 parent 39588bb commit 810bfb5

File tree

9 files changed

+877
-56
lines changed

9 files changed

+877
-56
lines changed

apps/opik-backend/config.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,21 @@ onlineScoring:
419419
pendingMessageDuration: ${REDIS_SCORING_SPAN_USER_DEFINED_METRIC_PYTHON_PENDING_MESSAGE_DURATION:-}
420420
maxRetries: ${REDIS_SCORING_SPAN_USER_DEFINED_METRIC_PYTHON_MAX_RETRIES:-}
421421

422+
# Configuration for Optimization Studio logs synchronization
423+
optimizationLogs:
424+
# Default: true
425+
# Description: Whether log synchronization from Redis to S3 is enabled
426+
enabled: ${OPTIMIZATION_LOGS_ENABLED:-true}
427+
# Default: 2m
428+
# Description: Interval between log sync runs
429+
syncInterval: ${OPTIMIZATION_LOGS_SYNC_INTERVAL:-2m}
430+
# Default: 30s
431+
# Description: Lock timeout for per-optimization distributed lock during sync
432+
lockTimeout: ${OPTIMIZATION_LOGS_LOCK_TIMEOUT:-30s}
433+
# Default: 5
434+
# Description: Maximum number of concurrent S3 uploads during sync
435+
syncConcurrency: ${OPTIMIZATION_LOGS_SYNC_CONCURRENCY:-5}
436+
422437
# LLM providers client configuration
423438
llmProviderClient:
424439
# Default: 3

apps/opik-backend/src/main/java/com/comet/opik/api/OptimizationStatus.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,21 @@
1010
@Getter
1111
@RequiredArgsConstructor
1212
public enum OptimizationStatus {
13-
RUNNING("running"),
14-
COMPLETED("completed"),
15-
CANCELLED("cancelled"),
16-
INITIALIZED("initialized"),
17-
ERROR("error");
13+
RUNNING("running", false),
14+
COMPLETED("completed", true),
15+
CANCELLED("cancelled", true),
16+
INITIALIZED("initialized", false),
17+
ERROR("error", true);
1818

1919
@JsonValue
2020
private final String value;
2121

22+
/**
23+
* Indicates whether this status represents a terminal (final) state.
24+
* Terminal statuses trigger log finalization and cleanup.
25+
*/
26+
private final boolean terminal;
27+
2228
@JsonCreator
2329
public static OptimizationStatus fromString(String value) {
2430
return Arrays.stream(values())
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package com.comet.opik.api.resources.v1.jobs;
2+
3+
import com.comet.opik.domain.optimization.OptimizationLogSyncService;
4+
import com.comet.opik.infrastructure.OptimizationLogsConfig;
5+
import io.dropwizard.lifecycle.Managed;
6+
import jakarta.inject.Inject;
7+
import lombok.NonNull;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.redisson.api.RedissonReactiveClient;
10+
import org.redisson.api.options.KeysScanOptions;
11+
import reactor.core.Disposable;
12+
import reactor.core.publisher.Flux;
13+
import reactor.core.publisher.Mono;
14+
import reactor.core.scheduler.Scheduler;
15+
import reactor.core.scheduler.Schedulers;
16+
import ru.vyarus.dropwizard.guice.module.installer.feature.eager.EagerSingleton;
17+
import ru.vyarus.dropwizard.guice.module.yaml.bind.Config;
18+
19+
import java.util.UUID;
20+
import java.util.regex.Matcher;
21+
import java.util.regex.Pattern;
22+
23+
/**
24+
* Managed service that periodically syncs optimization logs from Redis to S3.
25+
* <p>
26+
* Uses Flux.interval() for scheduling instead of Quartz, similar to BaseRedisSubscriber.
27+
* Each optimization has its own distributed lock to prevent duplicate S3 uploads across instances.
28+
* <p>
29+
* Redis key pattern scanned: opik:logs:*:meta
30+
*/
31+
@Slf4j
32+
@EagerSingleton
33+
public class OptimizationLogFlusherJob implements Managed {
34+
35+
// Pattern to extract workspace_id and optimization_id from meta key
36+
// opik:logs:{workspace_id}:{optimization_id}:meta
37+
private static final Pattern META_KEY_REGEX = Pattern.compile("opik:logs:([^:]+):([^:]+):meta");
38+
39+
private final RedissonReactiveClient redisClient;
40+
private final OptimizationLogSyncService logSyncService;
41+
private final OptimizationLogsConfig config;
42+
43+
private volatile Disposable subscription;
44+
private volatile Scheduler timerScheduler;
45+
46+
@Inject
47+
public OptimizationLogFlusherJob(
48+
@NonNull RedissonReactiveClient redisClient,
49+
@NonNull OptimizationLogSyncService logSyncService,
50+
@NonNull @Config("optimizationLogs") OptimizationLogsConfig config) {
51+
this.redisClient = redisClient;
52+
this.logSyncService = logSyncService;
53+
this.config = config;
54+
}
55+
56+
@Override
57+
public void start() {
58+
if (!config.isEnabled()) {
59+
log.info("Optimization log flusher is disabled");
60+
return;
61+
}
62+
63+
if (timerScheduler == null) {
64+
timerScheduler = Schedulers.newSingle("optimization-log-flusher-timer", true);
65+
}
66+
67+
if (subscription == null) {
68+
subscription = Flux.interval(config.getSyncInterval(), timerScheduler)
69+
.onBackpressureDrop(tick -> log.debug("Backpressure drop, tick '{}'", tick))
70+
.concatMap(tick -> scanAndSyncLogs())
71+
.subscribe(
72+
__ -> {
73+
},
74+
error -> log.error("Optimization log flusher failed", error));
75+
76+
log.info("Optimization log flusher started with interval '{}'", config.getSyncInterval());
77+
}
78+
}
79+
80+
@Override
81+
public void stop() {
82+
if (subscription != null && !subscription.isDisposed()) {
83+
subscription.dispose();
84+
log.info("Optimization log flusher stopped");
85+
}
86+
if (timerScheduler != null && !timerScheduler.isDisposed()) {
87+
timerScheduler.dispose();
88+
}
89+
}
90+
91+
/**
92+
* Scan Redis for optimization log meta keys and sync each one.
93+
*/
94+
private Mono<Void> scanAndSyncLogs() {
95+
var scanOptions = KeysScanOptions.defaults().pattern(OptimizationLogSyncService.META_KEY_PATTERN);
96+
return redisClient.getKeys().getKeys(scanOptions)
97+
.collectList()
98+
.flatMap(keys -> {
99+
if (keys.isEmpty()) {
100+
log.debug("No optimization log keys found");
101+
return Mono.empty();
102+
}
103+
104+
log.info("Found '{}' optimization log keys to check", keys.size());
105+
106+
return Flux.fromIterable(keys)
107+
.flatMap(this::syncLogForMetaKey, config.getSyncConcurrency())
108+
.onErrorContinue((error, key) -> log.warn("Failed to sync logs for key '{}'",
109+
key, error))
110+
.then();
111+
});
112+
}
113+
114+
/**
115+
* Parse meta key and trigger sync for the optimization.
116+
*/
117+
private Mono<Void> syncLogForMetaKey(String metaKey) {
118+
Matcher matcher = META_KEY_REGEX.matcher(metaKey);
119+
if (!matcher.matches()) {
120+
log.warn("Invalid meta key format: '{}'", metaKey);
121+
return Mono.empty();
122+
}
123+
124+
String workspaceId = matcher.group(1);
125+
String optimizationIdStr = matcher.group(2);
126+
127+
try {
128+
UUID optimizationId = UUID.fromString(optimizationIdStr);
129+
return logSyncService.syncLogsToS3(workspaceId, optimizationId);
130+
} catch (IllegalArgumentException e) {
131+
log.warn("Invalid optimization ID in meta key '{}': '{}'", metaKey, optimizationIdStr);
132+
return Mono.empty();
133+
}
134+
}
135+
}

apps/opik-backend/src/main/java/com/comet/opik/domain/OptimizationService.java

Lines changed: 102 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.comet.opik.api.events.OptimizationCreated;
1111
import com.comet.opik.api.events.OptimizationsDeleted;
1212
import com.comet.opik.domain.attachment.PreSignerService;
13+
import com.comet.opik.domain.optimization.OptimizationLogSyncService;
1314
import com.comet.opik.infrastructure.OpikConfiguration;
1415
import com.comet.opik.infrastructure.auth.RequestContext;
1516
import com.comet.opik.infrastructure.queues.Queue;
@@ -77,6 +78,7 @@ class OptimizationServiceImpl implements OptimizationService {
7778
private final @NonNull QueueProducer queueProducer;
7879
private final @NonNull WorkspaceNameService workspaceNameService;
7980
private final @NonNull OpikConfiguration config;
81+
private final @NonNull OptimizationLogSyncService logSyncService;
8082

8183
@Override
8284
@WithSpan
@@ -110,61 +112,91 @@ public Mono<Optimization.OptimizationPage> find(int page, int size,
110112
public Mono<UUID> upsert(@NonNull Optimization optimization) {
111113
UUID id = optimization.id() == null ? idGenerator.generateId() : optimization.id();
112114
IdGenerator.validateVersion(id, "Optimization");
113-
var name = StringUtils.getIfBlank(optimization.name(), nameGenerator::generateName);
114115

115-
// Detect if this is a Studio optimization
116+
// Detect if this is a Studio optimization (has studioConfig in the request)
116117
boolean isStudioOptimization = optimization.studioConfig() != null;
117118

118119
return datasetService.getOrCreateDataset(optimization.datasetName())
119-
.flatMap(datasetId -> {
120-
var builder = optimization.toBuilder()
121-
.id(id)
122-
.name(name)
123-
.datasetId(datasetId);
124-
125-
// Force INITIALIZED status for Studio optimizations
126-
if (isStudioOptimization) {
127-
builder.status(OptimizationStatus.INITIALIZED);
128-
log.info("Force INITIALIZED (was '{}') status for Studio optimization id '{}'",
129-
optimization.status(), id);
130-
}
131-
132-
var newOptimization = builder.build();
133-
134-
return makeMonoContextAware((userName, workspaceId) -> Mono.deferContextual(ctx -> {
135-
136-
return optimizationDAO.upsert(newOptimization)
137-
.thenReturn(newOptimization.id())
138-
// The event is posted only when the experiment is successfully created.
139-
.doOnSuccess(experimentId -> {
140-
postOptimizationCreatedEvent(newOptimization, workspaceId, userName);
141-
142-
// If Studio optimization, enqueue job to Redis RQ
143-
if (isStudioOptimization) {
144-
String workspaceName = ctx.getOrDefault(RequestContext.WORKSPACE_NAME, null);
145-
if (StringUtils.isBlank(workspaceName)) {
146-
try {
147-
workspaceName = workspaceNameService.getWorkspaceName(workspaceId,
148-
config.getAuthentication().getReactService().url());
149-
} catch (Exception e) {
150-
log.warn(
151-
"Failed to get workspace name for workspaceId '{}', using workspaceId as name: {}",
152-
workspaceId, e.getMessage());
153-
workspaceName = workspaceId;
154-
}
155-
}
156-
157-
String opikApiKey = newOptimization.studioConfig() != null
158-
? newOptimization.studioConfig().opikApiKey()
159-
: null;
120+
.flatMap(datasetId -> makeMonoContextAware((userName, workspaceId) -> Mono.deferContextual(ctx -> {
121+
122+
// Check if optimization already exists to preserve certain fields
123+
return optimizationDAO.getById(id)
124+
.map(Optional::of)
125+
.defaultIfEmpty(Optional.empty())
126+
.flatMap(existingOpt -> {
127+
var builder = optimization.toBuilder()
128+
.id(id)
129+
.datasetId(datasetId);
130+
131+
// Preserve existing fields when updating (SDK doesn't know about studioConfig)
132+
if (existingOpt.isPresent()) {
133+
var existing = existingOpt.get();
134+
log.info("Optimization '{}' already exists, preserving studioConfig", id);
135+
136+
// Preserve studioConfig if not provided in update
137+
if (optimization.studioConfig() == null && existing.studioConfig() != null) {
138+
builder.studioConfig(existing.studioConfig());
139+
}
160140

161-
enqueueStudioOptimizationJob(newOptimization, workspaceId, workspaceName,
162-
opikApiKey);
141+
// Preserve original name only if incoming name is blank
142+
// (SDK sends blank name, but explicit updates should be honored)
143+
if (StringUtils.isBlank(optimization.name())) {
144+
builder.name(existing.name());
145+
} else {
146+
builder.name(optimization.name());
163147
}
164-
});
165-
}))
166-
.subscribeOn(Schedulers.boundedElastic());
167-
})
148+
149+
// Don't re-enqueue job for existing optimizations
150+
} else {
151+
// New optimization: generate name if not provided
152+
var name = StringUtils.getIfBlank(optimization.name(),
153+
nameGenerator::generateName);
154+
builder.name(name);
155+
}
156+
157+
// Force INITIALIZED status for NEW Studio optimizations only
158+
if (isStudioOptimization && existingOpt.isEmpty()) {
159+
builder.status(OptimizationStatus.INITIALIZED);
160+
log.info("Force INITIALIZED (was '{}') status for NEW Studio optimization id '{}'",
161+
optimization.status(), id);
162+
}
163+
164+
var newOptimization = builder.build();
165+
boolean shouldEnqueueJob = isStudioOptimization && existingOpt.isEmpty();
166+
167+
return optimizationDAO.upsert(newOptimization)
168+
.thenReturn(newOptimization.id())
169+
.doOnSuccess(__ -> {
170+
postOptimizationCreatedEvent(newOptimization, workspaceId, userName);
171+
172+
// Only enqueue job for NEW Studio optimizations
173+
if (shouldEnqueueJob) {
174+
String workspaceName = ctx.getOrDefault(RequestContext.WORKSPACE_NAME,
175+
null);
176+
if (StringUtils.isBlank(workspaceName)) {
177+
try {
178+
workspaceName = workspaceNameService.getWorkspaceName(
179+
workspaceId,
180+
config.getAuthentication().getReactService().url());
181+
} catch (Exception e) {
182+
log.warn(
183+
"Failed to get workspace name for workspaceId '{}', using workspaceId as name: {}",
184+
workspaceId, e.getMessage());
185+
workspaceName = workspaceId;
186+
}
187+
}
188+
189+
String opikApiKey = newOptimization.studioConfig() != null
190+
? newOptimization.studioConfig().opikApiKey()
191+
: null;
192+
193+
enqueueStudioOptimizationJob(newOptimization, workspaceId,
194+
workspaceName, opikApiKey);
195+
}
196+
});
197+
});
198+
}))
199+
.subscribeOn(Schedulers.boundedElastic()))
168200
// If a conflict occurs, we just return the id of the existing experiment.
169201
// If any other error occurs, we throw it. The event is not posted for both cases.
170202
.onErrorResume(throwable -> handleCreateError(throwable, id));
@@ -202,7 +234,25 @@ public Mono<Long> update(@NonNull UUID id, @NonNull OptimizationUpdate update) {
202234

203235
return optimizationDAO.getById(id)
204236
.switchIfEmpty(Mono.error(failWithNotFound("Optimization", id)))
205-
.then(Mono.defer(() -> optimizationDAO.update(id, update)));
237+
.flatMap(optimization -> Mono.deferContextual(ctx -> {
238+
String workspaceId = ctx.get(RequestContext.WORKSPACE_ID);
239+
240+
return optimizationDAO.update(id, update)
241+
.doOnSuccess(result -> {
242+
// Sync logs when optimization reaches terminal status
243+
// Safe to call multiple times - just syncs and reduces TTL
244+
if (update.status() != null && update.status().isTerminal()) {
245+
finalizeLogsAsync(workspaceId, id);
246+
}
247+
});
248+
}));
249+
}
250+
251+
private void finalizeLogsAsync(String workspaceId, UUID optimizationId) {
252+
logSyncService.finalizeLogsOnCompletion(workspaceId, optimizationId)
253+
.doOnError(error -> log.error("Failed to finalize logs for optimization '{}'",
254+
optimizationId, error))
255+
.subscribe();
206256
}
207257

208258
@Override
@@ -251,9 +301,10 @@ private void enqueueStudioOptimizationJob(Optimization optimization, String work
251301
log.info("Enqueuing Optimization Studio job for id: '{}', workspace: '{}' (name: '{}')",
252302
optimization.id(), workspaceId, workspaceName);
253303

254-
// Build job message (use workspace name for SDK)
304+
// Build job message (use workspace name for SDK, workspace ID for log storage)
255305
var jobMessage = OptimizationStudioJobMessage.builder()
256306
.optimizationId(optimization.id())
307+
.workspaceId(workspaceId)
257308
.workspaceName(workspaceName)
258309
.config(optimization.studioConfig())
259310
.opikApiKey(opikApiKey)

apps/opik-backend/src/main/java/com/comet/opik/domain/OptimizationStudioJobMessage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
* information to execute an optimization job using the Opik SDK.
1616
* <p>
1717
* Note: The Python SDK requires workspace NAME (not ID) for authentication.
18+
* The workspace ID is also provided for use in log storage paths.
1819
*/
1920
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
2021
@Builder(toBuilder = true)
2122
public record OptimizationStudioJobMessage(
2223
@NonNull UUID optimizationId,
24+
@NonNull String workspaceId,
2325
@NonNull String workspaceName,
2426
@NonNull OptimizationStudioConfig config,
2527
String opikApiKey) {

0 commit comments

Comments
 (0)