Skip to content

Commit 8d67d2a

Browse files
committed
fix mainにSQL書いているの良くなさすぎるので分離する
1 parent bcdb58f commit 8d67d2a

8 files changed

Lines changed: 560 additions & 186 deletions

File tree

CLAUDE.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,13 @@ Scheduler (5min interval) → Redis Pub/Sub → Subscriber → LLM APIs → Data
331331

332332
## Development Guidelines
333333

334+
### Database Access Guidelines
335+
336+
- **Never write inline SQL in `cmd/` packages**: All SQL queries must be defined in `backend/infrastructure/database/` functions. `cmd/scheduler` and `cmd/subscriber` must call these functions instead of using `QueryContext`/`QueryRowContext`/`ExecContext` directly.
337+
- **One file per query domain**: Place queries in the appropriate file (e.g., `user_llms.go` for user_llms queries, `scheduler_queries.go` for cross-table scheduler queries).
338+
- **Every new database file needs a `*_test.go`**: When adding query functions to `backend/infrastructure/database/`, always create matching tests in `package database_test`.
339+
- **Type-aware comparisons**: `diaries.updated_at` is BIGINT (milliseconds), while `diary_embeddings.updated_at` is TIMESTAMP. Use `to_timestamp(d.updated_at / 1000.0)` for cross-table comparisons.
340+
334341
### Port Usage
335342

336343
- **Always use 2000 series ports**: All services must use ports in the 2000-2099 range

backend/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
/tmp
22
server
3+
subscriber
4+
scheduler

backend/cmd/scheduler/main.go

Lines changed: 10 additions & 186 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/project-mikan/umi.mikan/backend/constants"
1515
"github.com/project-mikan/umi.mikan/backend/container"
16+
"github.com/project-mikan/umi.mikan/backend/infrastructure/database"
1617
"github.com/prometheus/client_golang/prometheus"
1718
"github.com/prometheus/client_golang/prometheus/promhttp"
1819
"github.com/redis/rueidis"
@@ -333,30 +334,10 @@ func (j *DailySummaryJob) Execute(ctx context.Context, s *Scheduler) error {
333334
s.logger.Info("Checking for missing daily summaries...")
334335

335336
// 1. auto_summary_daily が true のユーザーを取得
336-
usersQuery := `
337-
SELECT user_id
338-
FROM user_llms
339-
WHERE auto_summary_daily = true
340-
`
341-
342-
rows, err := s.db.QueryContext(ctx, usersQuery)
337+
userIDs, err := database.UserIDsWithAutoSummaryDaily(ctx, s.db)
343338
if err != nil {
344339
return fmt.Errorf("failed to query users with auto summary enabled: %w", err)
345340
}
346-
defer func() {
347-
if err := rows.Close(); err != nil {
348-
s.logger.WithError(err).Error("Failed to close rows")
349-
}
350-
}()
351-
352-
var userIDs []string
353-
for rows.Next() {
354-
var userID string
355-
if err := rows.Scan(&userID); err != nil {
356-
return fmt.Errorf("failed to scan user ID: %w", err)
357-
}
358-
userIDs = append(userIDs, userID)
359-
}
360341

361342
if len(userIDs) == 0 {
362343
s.logger.Info("No users with auto daily summary enabled")
@@ -381,35 +362,10 @@ func (j *DailySummaryJob) processUserSummaries(ctx context.Context, s *Scheduler
381362
// diariesテーブルから該当ユーザーの日記がある日を取得し、
382363
// diary_summary_daysにsummaryがない日、または要約のupdated_atが日記のupdated_atより古い日を見つける(今日を除く)
383364
// 文字数が1000以上の日記のみ対象とする
384-
query := `
385-
SELECT d.date
386-
FROM diaries d
387-
LEFT JOIN diary_summary_days dsd ON d.user_id = dsd.user_id AND d.date = dsd.date
388-
WHERE d.user_id = $1
389-
AND d.date < CURRENT_DATE
390-
AND LENGTH(d.content) >= 1000
391-
AND (dsd.id IS NULL OR dsd.updated_at < d.updated_at)
392-
ORDER BY d.date
393-
`
394-
395-
rows, err := s.db.QueryContext(ctx, query, userID)
365+
missingDates, err := database.DiaryDatesNeedingDailySummary(ctx, s.db, userID)
396366
if err != nil {
397367
return fmt.Errorf("failed to query missing summaries for user %s: %w", userID, err)
398368
}
399-
defer func() {
400-
if err := rows.Close(); err != nil {
401-
s.logger.WithError(err).Error("Failed to close rows")
402-
}
403-
}()
404-
405-
var missingDates []time.Time
406-
for rows.Next() {
407-
var date time.Time
408-
if err := rows.Scan(&date); err != nil {
409-
return fmt.Errorf("failed to scan date: %w", err)
410-
}
411-
missingDates = append(missingDates, date)
412-
}
413369

414370
if len(missingDates) == 0 {
415371
s.logger.WithField("user_id", userID).Debug("No missing summaries for user")
@@ -467,30 +423,10 @@ func (j *MonthlySummaryJob) Execute(ctx context.Context, s *Scheduler) error {
467423
s.logger.Info("Checking for missing monthly summaries...")
468424

469425
// 1. auto_summary_monthly が true のユーザーを取得
470-
usersQuery := `
471-
SELECT user_id
472-
FROM user_llms
473-
WHERE auto_summary_monthly = true
474-
`
475-
476-
rows, err := s.db.QueryContext(ctx, usersQuery)
426+
userIDs, err := database.UserIDsWithAutoSummaryMonthly(ctx, s.db)
477427
if err != nil {
478428
return fmt.Errorf("failed to query users with auto monthly summary enabled: %w", err)
479429
}
480-
defer func() {
481-
if err := rows.Close(); err != nil {
482-
s.logger.WithError(err).Error("Failed to close rows")
483-
}
484-
}()
485-
486-
var userIDs []string
487-
for rows.Next() {
488-
var userID string
489-
if err := rows.Scan(&userID); err != nil {
490-
return fmt.Errorf("failed to scan user ID: %w", err)
491-
}
492-
userIDs = append(userIDs, userID)
493-
}
494430

495431
if len(userIDs) == 0 {
496432
s.logger.Info("No users with auto monthly summary enabled")
@@ -515,52 +451,10 @@ func (j *MonthlySummaryJob) processUserMonthlySummaries(ctx context.Context, s *
515451
// diariesテーブルから該当ユーザーの日記がある年月を取得し、
516452
// diary_summary_monthsに月次要約がない月、またはその月の日記の最新updated_atより月次要約のupdated_atが古い月を見つける(今月を除く)
517453
// 日記数が1以上の月のみ対象とする
518-
query := `
519-
WITH monthly_diary_stats AS (
520-
SELECT
521-
EXTRACT(YEAR FROM d.date) as year,
522-
EXTRACT(MONTH FROM d.date) as month,
523-
MAX(d.updated_at) as latest_diary_updated_at,
524-
COUNT(*) as diary_count
525-
FROM diaries d
526-
WHERE d.user_id = $1
527-
GROUP BY EXTRACT(YEAR FROM d.date), EXTRACT(MONTH FROM d.date)
528-
HAVING COUNT(*) >= 1
529-
)
530-
SELECT mds.year, mds.month
531-
FROM monthly_diary_stats mds
532-
LEFT JOIN diary_summary_months dsm ON dsm.user_id = $1
533-
AND dsm.year = mds.year
534-
AND dsm.month = mds.month
535-
WHERE (mds.year < EXTRACT(YEAR FROM CURRENT_DATE)
536-
OR (mds.year = EXTRACT(YEAR FROM CURRENT_DATE) AND mds.month < EXTRACT(MONTH FROM CURRENT_DATE)))
537-
AND (dsm.updated_at IS NULL OR dsm.updated_at < mds.latest_diary_updated_at)
538-
ORDER BY mds.year, mds.month
539-
`
540-
541-
rows, err := s.db.QueryContext(ctx, query, userID)
454+
missingMonths, err := database.MonthsNeedingMonthlySummary(ctx, s.db, userID)
542455
if err != nil {
543456
return fmt.Errorf("failed to query missing monthly summaries for user %s: %w", userID, err)
544457
}
545-
defer func() {
546-
if err := rows.Close(); err != nil {
547-
s.logger.WithError(err).Error("Failed to close rows")
548-
}
549-
}()
550-
551-
type YearMonth struct {
552-
Year int
553-
Month int
554-
}
555-
556-
var missingMonths []YearMonth
557-
for rows.Next() {
558-
var year, month int
559-
if err := rows.Scan(&year, &month); err != nil {
560-
return fmt.Errorf("failed to scan year/month: %w", err)
561-
}
562-
missingMonths = append(missingMonths, YearMonth{Year: year, Month: month})
563-
}
564458

565459
if len(missingMonths) == 0 {
566460
s.logger.WithField("user_id", userID).Debug("No missing monthly summaries for user")
@@ -627,30 +521,10 @@ func (j *LatestTrendJob) Execute(ctx context.Context, s *Scheduler) error {
627521
s.logger.Info("Starting latest trend analysis generation")
628522

629523
// 1. auto_latest_trend_enabled が true のユーザーを取得
630-
usersQuery := `
631-
SELECT user_id
632-
FROM user_llms
633-
WHERE auto_latest_trend_enabled = true
634-
`
635-
636-
rows, err := s.db.QueryContext(ctx, usersQuery)
524+
userIDs, err := database.UserIDsWithAutoLatestTrendEnabled(ctx, s.db)
637525
if err != nil {
638526
return fmt.Errorf("failed to query users with auto latest trend enabled: %w", err)
639527
}
640-
defer func() {
641-
if err := rows.Close(); err != nil {
642-
s.logger.WithError(err).Error("Failed to close rows")
643-
}
644-
}()
645-
646-
var userIDs []string
647-
for rows.Next() {
648-
var userID string
649-
if err := rows.Scan(&userID); err != nil {
650-
return fmt.Errorf("failed to scan user ID: %w", err)
651-
}
652-
userIDs = append(userIDs, userID)
653-
}
654528

655529
if len(userIDs) == 0 {
656530
s.logger.Info("No users with auto latest trend enabled")
@@ -717,13 +591,8 @@ func (j *LatestTrendJob) processUserLatestTrend(ctx context.Context, s *Schedule
717591
}
718592

719593
// 対象期間に日記が最小必要数以上存在するかチェック
720-
var count int
721-
checkQuery := `
722-
SELECT COUNT(*) FROM diaries
723-
WHERE user_id = $1
724-
AND date >= $2 AND date <= $3
725-
`
726-
if err := s.db.QueryRowContext(ctx, checkQuery, userID, periodStart, periodEnd).Scan(&count); err != nil {
594+
count, err := database.DiaryCountInDateRange(ctx, s.db, userID, periodStart, periodEnd)
595+
if err != nil {
727596
return fmt.Errorf("failed to check diary entries: %w", err)
728597
}
729598

@@ -798,30 +667,10 @@ func (j *DiaryEmbeddingJob) Execute(ctx context.Context, s *Scheduler) error {
798667
s.logger.Info("Starting diary embedding generation for yesterday's diaries")
799668

800669
// 1. semantic_search_enabled が true のユーザーを取得
801-
usersQuery := `
802-
SELECT user_id
803-
FROM user_llms
804-
WHERE semantic_search_enabled = true
805-
`
806-
807-
rows, err := s.db.QueryContext(ctx, usersQuery)
670+
userIDs, err := database.UserIDsWithSemanticSearchEnabled(ctx, s.db)
808671
if err != nil {
809672
return fmt.Errorf("failed to query users with semantic search enabled: %w", err)
810673
}
811-
defer func() {
812-
if err := rows.Close(); err != nil {
813-
s.logger.WithError(err).Error("Failed to close rows")
814-
}
815-
}()
816-
817-
var userIDs []string
818-
for rows.Next() {
819-
var userID string
820-
if err := rows.Scan(&userID); err != nil {
821-
return fmt.Errorf("failed to scan user ID: %w", err)
822-
}
823-
userIDs = append(userIDs, userID)
824-
}
825674

826675
if len(userIDs) == 0 {
827676
s.logger.Info("No users with semantic search enabled")
@@ -860,35 +709,10 @@ func calculateYesterdayUTC(now time.Time) time.Time {
860709
func (j *DiaryEmbeddingJob) processUserDiaryEmbedding(ctx context.Context, s *Scheduler, userID string, targetDate time.Time) error {
861710
// 対象日付の日記を取得
862711
// embedding未生成またはembeddingのupdated_atより日記のupdated_atが新しい場合に処理対象とする
863-
query := `
864-
SELECT d.id
865-
FROM diaries d
866-
WHERE d.user_id = $1
867-
AND d.date = $2
868-
AND (
869-
NOT EXISTS (SELECT 1 FROM diary_embeddings de WHERE de.diary_id = d.id)
870-
OR (SELECT MAX(de.updated_at) FROM diary_embeddings de WHERE de.diary_id = d.id) < d.updated_at
871-
)
872-
`
873-
874-
rows, err := s.db.QueryContext(ctx, query, userID, targetDate)
712+
diaryIDs, err := database.DiaryIDsNeedingEmbedding(ctx, s.db, userID, targetDate)
875713
if err != nil {
876714
return fmt.Errorf("failed to query diary for user %s date %s: %w", userID, targetDate.Format("2006-01-02"), err)
877715
}
878-
defer func() {
879-
if err := rows.Close(); err != nil {
880-
s.logger.WithError(err).Error("Failed to close rows")
881-
}
882-
}()
883-
884-
var diaryIDs []string
885-
for rows.Next() {
886-
var diaryID string
887-
if err := rows.Scan(&diaryID); err != nil {
888-
return fmt.Errorf("failed to scan diary ID: %w", err)
889-
}
890-
diaryIDs = append(diaryIDs, diaryID)
891-
}
892716

893717
if len(diaryIDs) == 0 {
894718
s.logger.WithFields(map[string]any{

0 commit comments

Comments
 (0)