Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/serve_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
# ── Moderator community loops ──
daily_moderator_source_voting.to_deployment(
name="Daily Moderator Source Voting",
schedules=[CronSchedule(cron="0 10 * * *", timezone=MSK)],
schedules=[CronSchedule(cron="*/15 * * * *", timezone=MSK)],
),
# ── Broadcasts ──
broadcast_next_meme_to_active_15m_ago.to_deployment(
Expand Down
24 changes: 18 additions & 6 deletions specs/moderator-community-loop.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,20 @@ If the poll is closed, answer `Голосование уже закрыто` and

## Daily Source Cycle

Run once every 24 hours from a small scheduled flow or admin command.
Run from a small scheduled flow or admin command. The production schedule may
check frequently (for example every 15 minutes) so early negative polls can be
closed soon after they become eligible.

Order matters:

1. Post the **Next-Day Source Report** for the last source that passed and was enabled in a previous cycle, if it has not been reported yet.
2. Close the currently `open` poll, if one exists:
2. Close the currently `open` poll if its 24-hour window has elapsed or if it
matches the early negative rule:
- recompute likes/dislikes from `meme_source_candidate_vote`;
- mark the poll `passed`, `rejected`, or `expired_no_quorum`;
- edit the moderator-chat message so voting is visibly closed.
- edit the moderator-chat message so it keeps the source URL and shows
final vote results;
- unpin the closed voting message.
3. If the closed poll passed, enable the prepared source:
- load `poll.prepared_meme_source_id` or the candidate's `promoted_meme_source_id`;
- set `language_code='ru'`;
Expand All @@ -208,15 +213,17 @@ Order matters:
- create or reuse a `meme_source` row with `status='in_moderation'`;
- fetch the latest public Telegram posts once;
- inspect the fetched posts for Cyrillic evidence;
- if Cyrillic is absent, dismiss the candidate and stop the cycle for the day without posting a poll;
- if Cyrillic is absent, dismiss the candidate and stop the current run without posting a poll;
- if Cyrillic is present, save the fetched posts into `meme_raw_telegram`;
- mark the candidate `prepared` and store `promoted_meme_source_id`.
6. Insert `meme_source_candidate_poll` as `draft`, with `prepared_meme_source_id` and `closes_at = now() + interval '24 hours'`.
7. Send Telegram message to `TELEGRAM_MODERATOR_CHAT_ID` with callback buttons containing the new `poll_id`.
8. Update poll with `chat_id`, `message_id`, `opened_at`, `status='open'`.
9. If send fails, mark `status='cancelled'` and keep the prepared source in `in_moderation` for an admin retry.

Hard limit: 1 new source poll per day. The point is a steady community ritual, not a high-volume ops feed.
Hard limit: exactly one active source poll at a time. The point is a steady
community ritual, not a high-volume ops feed; early-rejected non-meme sources
may be replaced before the full 24-hour window.

Message content should include:

Expand Down Expand Up @@ -285,7 +292,12 @@ Outcomes:

Rejected sources must not return to the automatic daily cycle. If the owner wants to revisit one, they can add it manually later.

Do not early-close. The daily cycle closes the poll after the full 24-hour window.
Early negative close: after the poll has been open for at least 90 minutes,
if it has 0 likes and at least 6 dislikes, close it immediately as
`rejected`, set the prepared source to `parsing_disabled`, dismiss the
candidate with `dismissed_reason='source_vote:{poll_id}:early_negative_not_meme_source'`,
write `meme_source.data.source_vote_rejection.reason='early_negative_not_meme_source'`,
and try to post the next candidate.

## Passing Source Flow

Expand Down
14 changes: 14 additions & 0 deletions src/flows/storage/describe_memes.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from src.flows.events import safe_emit
from src.flows.hooks import notify_telegram_on_failure
from src.redis import redis_client
from src.storage.service import find_meme_duplicate, resolve_meme_duplicate
from src.storage.upload import download_meme_content_from_tg

OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
Expand Down Expand Up @@ -242,6 +243,7 @@ async def get_memes_to_describe(limit: int = 30) -> list[dict]:
M.id,
M.telegram_file_id,
M.ocr_result,
M.status,
M.language_code
FROM meme M
LEFT JOIN meme_stats MS ON MS.meme_id = M.id
Expand Down Expand Up @@ -646,6 +648,18 @@ async def describe_single_meme(meme_row: dict, log, *, deadline: float | None =

update_query = meme.update().where(meme.c.id == meme_id).values(**update_kwargs).returning(meme)
await fetch_one(update_query)

if meme_row.get("status") == "ok":
duplicate_meme_id = await find_meme_duplicate(meme_id, merged.get("text", ""))
if duplicate_meme_id:
result = await resolve_meme_duplicate(meme_id, duplicate_meme_id)
log.info(
"Meme %s resolved as OCR duplicate of %s after describe: %s",
meme_id,
duplicate_meme_id,
result,
)

return "ok"


Expand Down
5 changes: 5 additions & 0 deletions src/flows/storage/memes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
get_pending_memes,
get_unloaded_tg_memes,
get_unloaded_vk_memes,
resolve_all_file_id_duplicates,
resolve_meme_duplicate,
update_meme,
update_meme_status_of_ready_memes,
Expand Down Expand Up @@ -220,6 +221,10 @@ async def final_meme_pipeline() -> None:
# next step of a pipeline
await update_meme_status_of_ready_memes()

file_id_duplicates = await resolve_all_file_id_duplicates()
if file_id_duplicates["resolved"]:
logger.info("Resolved file_id duplicates: %s", file_id_duplicates)

safe_emit(
"ff.pipeline.final.completed",
"ff.pipeline.final",
Expand Down
106 changes: 94 additions & 12 deletions src/storage/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,15 +434,16 @@ async def find_meme_duplicate(meme_id: int, imagetext: str) -> int | None:
async def resolve_meme_duplicate(dupe_id: int, original_id: int) -> dict[str, int]:
"""Mark a meme as duplicate with full cleanup.

1. Move reactions from dupe → original (skip conflicts)
2. Delete remaining reactions on dupe
3. Delete meme_stats for dupe
4. Set meme status='duplicate', duplicate_of=original_id

Stats for original will auto-recalculate on next 5-15 min cycle.
Returns counts: {moved, conflicts, deleted_stats}.
1. Move user reactions from dupe -> original (skip conflicts)
2. Move group-chat reactions from dupe -> original (skip conflicts)
3. Delete remaining reactions on dupe
4. Delete meme_stats for dupe
5. Set meme status='duplicate', duplicate_of=original_id
6. Refresh basic meme_stats counters for original

Returns counts for moved/deleted reaction rows.
"""
# 1. Move non-conflicting reactions to original
# 1. Move non-conflicting user reactions to original
move_query = text(
"""
WITH moved AS (
Expand All @@ -465,7 +466,31 @@ async def resolve_meme_duplicate(dupe_id: int, original_id: int) -> dict[str, in
res = await fetch_one(move_query, {"dupe_id": dupe_id, "original_id": original_id})
moved = res["moved"] if res else 0

# 2. Delete all reactions remaining on dupe (conflicts + already moved)
# 2. Move non-conflicting group-chat reactions to original
move_chat_reactions = text(
"""
WITH moved AS (
INSERT INTO chat_meme_reaction
(chat_id, meme_id, user_id, reaction, reacted_at)
SELECT chat_id, :original_id, user_id, reaction, reacted_at
FROM chat_meme_reaction
WHERE meme_id = :dupe_id
AND NOT EXISTS (
SELECT 1 FROM chat_meme_reaction existing
WHERE existing.chat_id = chat_meme_reaction.chat_id
AND existing.user_id = chat_meme_reaction.user_id
AND existing.meme_id = :original_id
)
ON CONFLICT (chat_id, meme_id, user_id) DO NOTHING
RETURNING 1
)
SELECT count(*) AS moved FROM moved
"""
)
res = await fetch_one(move_chat_reactions, {"dupe_id": dupe_id, "original_id": original_id})
chat_moved = res["moved"] if res else 0

# 3. Delete all reactions remaining on dupe (conflicts + already moved)
delete_reactions = text(
"""
WITH deleted AS (
Expand All @@ -477,13 +502,24 @@ async def resolve_meme_duplicate(dupe_id: int, original_id: int) -> dict[str, in
res = await fetch_one(delete_reactions, {"dupe_id": dupe_id})
conflicts = res["conflicts"] if res else 0

# 3. Delete meme_stats for dupe (stale, will not regenerate since no reactions)
delete_chat_reactions = text(
"""
WITH deleted AS (
DELETE FROM chat_meme_reaction WHERE meme_id = :dupe_id RETURNING 1
)
SELECT count(*) AS deleted FROM deleted
"""
)
res = await fetch_one(delete_chat_reactions, {"dupe_id": dupe_id})
chat_deleted = res["deleted"] if res else 0

# 4. Delete meme_stats for dupe (stale, will not regenerate since no reactions)
await execute(
text("DELETE FROM meme_stats WHERE meme_id = :dupe_id"),
{"dupe_id": dupe_id},
)

# 4. Mark meme as duplicate
# 5. Mark meme as duplicate
await execute(
text(
"""
Expand All @@ -495,7 +531,53 @@ async def resolve_meme_duplicate(dupe_id: int, original_id: int) -> dict[str, in
{"dupe_id": dupe_id, "original_id": original_id},
)

return {"moved": moved, "conflicts": conflicts}
# 6. Refresh basic original counters now. Incremental stats only revisits recent
# reactions, while duplicate cleanup often moves old rows.
await execute(
text(
"""
INSERT INTO meme_stats (
meme_id, nlikes, ndislikes, nmemes_sent, age_days, sec_to_react, updated_at
)
SELECT
M.id AS meme_id,
COUNT(R.*) FILTER (WHERE R.reaction_id = 1) AS nlikes,
COUNT(R.*) FILTER (WHERE R.reaction_id = 2) AS ndislikes,
COUNT(R.*) AS nmemes_sent,
EXTRACT('DAYS' FROM NOW() - M.published_at)::int AS age_days,
COALESCE(EXTRACT(
EPOCH FROM percentile_cont(0.5)
WITHIN GROUP (ORDER BY R.reacted_at - R.sent_at)
FILTER (
WHERE R.reacted_at - R.sent_at
BETWEEN '0.5 second'
AND '1 minute'
)
), 99999) AS sec_to_react,
NOW() AS updated_at
FROM meme M
LEFT JOIN user_meme_reaction R
ON R.meme_id = M.id
WHERE M.id = :original_id
GROUP BY M.id
ON CONFLICT (meme_id) DO UPDATE SET
nlikes = EXCLUDED.nlikes,
ndislikes = EXCLUDED.ndislikes,
nmemes_sent = EXCLUDED.nmemes_sent,
age_days = EXCLUDED.age_days,
sec_to_react = EXCLUDED.sec_to_react,
updated_at = EXCLUDED.updated_at
"""
),
{"original_id": original_id},
)

return {
"moved": moved,
"conflicts": conflicts,
"chat_moved": chat_moved,
"chat_deleted": chat_deleted,
}


async def resolve_all_file_id_duplicates() -> dict[str, int]:
Expand Down
Loading
Loading