Skip to content

Commit 384c108

Browse files
committed
fix 一括生成で止まらないようにする
1 parent 11cfe69 commit 384c108

4 files changed

Lines changed: 36 additions & 12 deletions

File tree

backend/cmd/subscriber/main.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/prometheus/client_golang/prometheus/promhttp"
2424
"github.com/redis/rueidis"
2525
"github.com/sirupsen/logrus"
26+
"golang.org/x/time/rate"
2627
)
2728

2829
// getTaskTimeout 環境変数からタスクタイムアウトを取得(デフォルト600秒)
@@ -167,6 +168,10 @@ func runSubscriber(app *container.SubscriberApp, cleanup *container.Cleanup, log
167168
}
168169
}()
169170

171+
// Gemini API レートリミッター(3000 RPM = 50 RPS、バースト50)
172+
// チャンク分割・embedding生成の両方に適用してレートリミット超過を防ぐ
173+
geminiRateLimiter := rate.NewLimiter(rate.Every(time.Minute/3000), 50)
174+
170175
logger.WithField("max_concurrent_jobs", app.SubscriberConfig.MaxConcurrentJobs).Info("Subscriber is listening for messages...")
171176

172177
// Create context for subscription that can be cancelled
@@ -239,7 +244,7 @@ func runSubscriber(app *container.SubscriberApp, cleanup *container.Cleanup, log
239244
}()
240245

241246
start := time.Now()
242-
err := processMessage(subCtx, app.DB, app.Redis, app.LLMFactory, app.LockService, msg.Message, logger)
247+
err := processMessage(subCtx, app.DB, app.Redis, app.LLMFactory, app.LockService, geminiRateLimiter, msg.Message, logger)
243248
duration := time.Since(start)
244249

245250
// メトリクス更新は processMessage 内で行う
@@ -336,7 +341,7 @@ func runSubscriber(app *container.SubscriberApp, cleanup *container.Cleanup, log
336341
return nil
337342
}
338343

339-
func processMessage(ctx context.Context, db *sql.DB, redisClient rueidis.Client, llmFactory container.LLMClientFactory, lockService container.LockService, payload string, logger *logrus.Entry) error {
344+
func processMessage(ctx context.Context, db *sql.DB, redisClient rueidis.Client, llmFactory container.LLMClientFactory, lockService container.LockService, geminiRateLimiter *rate.Limiter, payload string, logger *logrus.Entry) error {
340345
start := time.Now()
341346

342347
// まずメッセージタイプを確認
@@ -413,7 +418,7 @@ func processMessage(ctx context.Context, db *sql.DB, redisClient rueidis.Client,
413418
messagesProcessedCounter.WithLabelValues("diary_embedding", "error").Inc()
414419
return fmt.Errorf("failed to unmarshal diary embedding message: %w", unmarshalErr)
415420
}
416-
err = generateDiaryEmbedding(ctx, db, llmFactory, message.UserID, message.DiaryID, logger)
421+
err = generateDiaryEmbedding(ctx, db, llmFactory, geminiRateLimiter, message.UserID, message.DiaryID, logger)
417422
if err != nil {
418423
messagesProcessedCounter.WithLabelValues("diary_embedding", "error").Inc()
419424
} else {
@@ -1091,7 +1096,7 @@ func generateDiaryHighlightWithLLM(ctx context.Context, db *sql.DB, llmFactory c
10911096
return highlights, nil
10921097
}
10931098

1094-
func generateDiaryEmbedding(ctx context.Context, db *sql.DB, llmFactory container.LLMClientFactory, userID, diaryID string, logger *logrus.Entry) error {
1099+
func generateDiaryEmbedding(ctx context.Context, db *sql.DB, llmFactory container.LLMClientFactory, geminiRateLimiter *rate.Limiter, userID, diaryID string, logger *logrus.Entry) error {
10951100
logger.WithFields(logrus.Fields{
10961101
"user_id": userID,
10971102
"diary_id": diaryID,
@@ -1140,6 +1145,9 @@ func generateDiaryEmbedding(ctx context.Context, db *sql.DB, llmFactory containe
11401145
}()
11411146

11421147
// 4. 日記を話題ごとのチャンクに分割する(失敗時は日記全体を1チャンクとしてフォールバック)
1148+
if err := geminiRateLimiter.Wait(ctx); err != nil {
1149+
return fmt.Errorf("rate limiter cancelled before SplitDiaryIntoChunks: %w", err)
1150+
}
11431151
chunkDataList, err := geminiClient.SplitDiaryIntoChunks(ctx, diaryContent)
11441152
if err != nil {
11451153
logger.WithFields(logrus.Fields{
@@ -1172,6 +1180,10 @@ func generateDiaryEmbedding(ctx context.Context, db *sql.DB, llmFactory containe
11721180
defer embWg.Done()
11731181
// 時間的クエリの精度向上のため日付情報を先頭に付与する
11741182
enrichedChunk := fmt.Sprintf("%d年%d月%d日の日記:\n%s", diaryDate.Year(), int(diaryDate.Month()), diaryDate.Day(), cd.Content)
1183+
if waitErr := geminiRateLimiter.Wait(ctx); waitErr != nil {
1184+
embResults[idx] = embeddingResult{err: fmt.Errorf("rate limiter cancelled before GenerateEmbedding for chunk %d: %w", idx, waitErr)}
1185+
return
1186+
}
11751187
embedding, err := geminiClient.GenerateEmbedding(ctx, enrichedChunk, true)
11761188
if err != nil {
11771189
embResults[idx] = embeddingResult{err: fmt.Errorf("failed to generate embedding for chunk %d: %w", idx, err)}

backend/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ require (
267267
golang.org/x/sync v0.20.0 // indirect
268268
golang.org/x/sys v0.42.0 // indirect
269269
golang.org/x/text v0.35.0 // indirect
270+
golang.org/x/time v0.15.0 // indirect
270271
golang.org/x/tools v0.42.0 // indirect
271272
golang.org/x/tools/go/expect v0.1.1-deprecated // indirect
272273
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated // indirect

backend/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
921921
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
922922
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
923923
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
924+
golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
925+
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
924926
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
925927
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
926928
golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=

backend/service/diary/service.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,8 +1223,23 @@ func (s *DiaryEntry) RegenerateAllEmbeddings(
12231223
return nil, status.Error(codes.Internal, "Redis not configured")
12241224
}
12251225

1226-
// 同一ユーザーによる同時実行を防ぐ分散ロック(TTL: 10分)
1227-
regenLock := lock.NewDistributedLock(s.Redis, lock.EmbeddingRegenLockKey(userIDStr), 10*time.Minute)
1226+
// embedding未生成の日記ID一覧を取得(ロック取得前に件数を確認してTTLを決定する)
1227+
diaryIDs, err := database.DiaryIDsWithoutEmbeddings(ctx, s.DB, userID)
1228+
if err != nil {
1229+
return nil, status.Errorf(codes.Internal, "Failed to query diaries: %v", err)
1230+
}
1231+
1232+
// 件数に応じてロックTTLを動的に計算する(1件あたり10秒、最小10分・最大24時間)
1233+
lockTTL := time.Duration(len(diaryIDs)) * 10 * time.Second
1234+
if lockTTL < 10*time.Minute {
1235+
lockTTL = 10 * time.Minute
1236+
}
1237+
if lockTTL > 24*time.Hour {
1238+
lockTTL = 24 * time.Hour
1239+
}
1240+
1241+
// 同一ユーザーによる同時実行を防ぐ分散ロック
1242+
regenLock := lock.NewDistributedLock(s.Redis, lock.EmbeddingRegenLockKey(userIDStr), lockTTL)
12281243
acquired, err := regenLock.TryLock(ctx)
12291244
if err != nil {
12301245
return nil, status.Errorf(codes.Internal, "Failed to acquire lock: %v", err)
@@ -1238,12 +1253,6 @@ func (s *DiaryEntry) RegenerateAllEmbeddings(
12381253
}
12391254
}()
12401255

1241-
// embedding未生成の日記ID一覧を取得
1242-
diaryIDs, err := database.DiaryIDsWithoutEmbeddings(ctx, s.DB, userID)
1243-
if err != nil {
1244-
return nil, status.Errorf(codes.Internal, "Failed to query diaries: %v", err)
1245-
}
1246-
12471256
// 各日記のembedding生成メッセージをキューに追加
12481257
// 手動再生成のため当日の日記も即時処理する(on-saveとは異なる)
12491258
var count int32

0 commit comments

Comments
 (0)