Skip to content

Commit e022425

Browse files
authored
Merge pull request #234 from Willy-JL/fix/indexer-qol
Indexer: Bugfixes and Last Updated date improvements part 1
2 parents 2811ad8 + ddbed5d commit e022425

4 files changed

Lines changed: 61 additions & 49 deletions

File tree

indexer/cache.py

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
SHORT_TTL = dt.timedelta(days=2).total_seconds()
1818
LAST_CHANGE_ELIGIBLE_FIELDS = (
1919
"name",
20-
"thread_version",
20+
"version",
2121
"developer",
2222
"type",
2323
"status",
@@ -29,7 +29,10 @@
2929
"tags",
3030
"unknown_tags",
3131
"image_url",
32+
"previews_urls",
3233
"downloads",
34+
"reviews_total",
35+
"reviews",
3336
"INDEX_ERROR",
3437
)
3538

@@ -38,13 +41,13 @@
3841
locks_lock = asyncio.Lock()
3942
locks: dict[asyncio.Lock] = {}
4043

41-
CACHE_KEYWORDS = (
42-
LAST_CACHED := "LAST_CACHED",
44+
LAST_CACHED = "LAST_CACHED"
45+
EXPIRE_TIME = "EXPIRE_TIME"
46+
INDEX_ERROR = "INDEX_ERROR"
47+
INTERNAL_KEYWORDS = (
4348
CACHED_WITH := "CACHED_WITH",
4449
LAST_CHANGE := "LAST_CHANGE",
45-
EXPIRE_TIME := "EXPIRE_TIME",
4650
HASHED_META := "HASHED_META",
47-
INDEX_ERROR := "INDEX_ERROR",
4851
)
4952
NAME_FORMAT = "thread:{id}"
5053

@@ -96,28 +99,19 @@ async def get_thread(id: int) -> dict[str, str]:
9699

97100
thread = await redis.hgetall(name)
98101

99-
# Don't return thread data (there might be some) if an error flag is active
100-
if thread.get(INDEX_ERROR):
101-
return {INDEX_ERROR: thread[INDEX_ERROR]}
102-
103102
# Remove internal fields from response
104-
for key in CACHE_KEYWORDS:
103+
for key in INTERNAL_KEYWORDS:
105104
if key in thread:
106105
del thread[key]
107106
return thread
108107

109108

110109
async def _is_thread_cache_outdated(id: int, name: str) -> bool:
111-
last_cached, cached_with, expire_time = await redis.hmget(
112-
name, (LAST_CACHED, CACHED_WITH, EXPIRE_TIME)
113-
)
110+
last_cached, expire_time = await redis.hmget(name, (LAST_CACHED, EXPIRE_TIME))
114111
if last_cached and not expire_time:
115112
expire_time = int(last_cached) + CACHE_TTL
116-
return (
117-
not last_cached # Never cached
118-
or time.time() >= int(expire_time) # Cache expired
119-
# or cached_with != meta.version # Cached on different version
120-
)
113+
# Never cached or cache expired
114+
return not last_cached or time.time() >= int(expire_time)
121115

122116

123117
async def _maybe_update_thread_cache(id: int, name: str) -> None:
@@ -148,11 +142,6 @@ async def _update_thread_cache(id: int, name: str) -> None:
148142
INDEX_ERROR: result.error_flag,
149143
EXPIRE_TIME: int(now + result.retry_delay),
150144
}
151-
if result is f95zone.ERROR_THREAD_MISSING:
152-
# F95zone responded but thread is missing, remove any previous cache
153-
await redis.delete(name)
154-
if last_change := old_fields.get(LAST_CHANGE):
155-
new_fields[LAST_CHANGE] = last_change
156145
# Consider new error as a change
157146
if old_fields.get(INDEX_ERROR) != new_fields.get(INDEX_ERROR):
158147
new_fields[LAST_CHANGE] = int(now)

indexer/f95zone.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
b"<p>Automated backups are currently executing. During this time, the site will be unavailable</p>",
3939
b"<title>F95Zone :: Scheduled Maintenance</title>",
4040
b'<script src="https://static.f95zone.to/assets/SamF95/ErrorPage',
41-
b'<div class="blockMessage"><p>Please check back in 10 mins</p></div>'
41+
b'<div class="blockMessage"><p>Please check back in 10 mins</p></div>',
4242
)
4343

4444
logger = logging.getLogger(__name__)

indexer/scraper.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,12 @@ async def thread(id: int) -> dict[str, str] | f95zone.IndexerError | None:
127127
parser.attachment(preview_url)
128128
for preview_url in update["screens"]
129129
] or ret.previews_urls
130-
ret.last_updated = parser.datestamp(update["ts"])
130+
last_promoted = parser.datestamp(update["ts"])
131+
if (
132+
ret.last_updated > time.time() # Only if thread has a typo
133+
or last_promoted > ret.last_updated # Or it's outdated
134+
):
135+
ret.last_updated = last_promoted
131136
break
132137
else: # Didn't break
133138
continue
@@ -141,7 +146,7 @@ async def thread(id: int) -> dict[str, str] | f95zone.IndexerError | None:
141146
async with f95zone.RATELIMIT:
142147
try:
143148
async with f95zone.session.get(
144-
thread_url + "/br-reviews",
149+
thread_url + "/br-reviews/",
145150
cookies=f95zone.cookies,
146151
) as req:
147152
if req.status == 429 and retries > 1:
@@ -159,17 +164,22 @@ async def thread(id: int) -> dict[str, str] | f95zone.IndexerError | None:
159164
if index_error := f95zone.check_error(res, logger):
160165
return index_error
161166

162-
if not str(req.real_url).endswith("br-reviews"):
167+
if not str(req.real_url).rstrip("/").endswith("br-reviews"):
163168
# Some threads have reviews disabled
164169
reviews = parser.ParsedReviews(total=0, items=[])
165170
else:
166171
reviews = await loop.run_in_executor(None, parser.reviews, res)
167172
if isinstance(reviews, parser.ParserError):
168173

169-
if reviews.message == "Thread structure missing" and req.status in (403, 404):
174+
if reviews.message == "Thread structure missing" and req.status in (
175+
403,
176+
404,
177+
):
170178
return f95zone.ERROR_THREAD_MISSING
171179

172-
logger.error(f"Thread {id} reviews parsing failed: {reviews.message}\n{reviews.dump}")
180+
logger.error(
181+
f"Thread {id} reviews parsing failed: {reviews.message}\n{reviews.dump}"
182+
)
173183
return f95zone.ERROR_PARSING_FAILED
174184

175185
reviews.items = [dataclasses.asdict(review) for review in reviews.items]

indexer/watcher.py

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ async def watch_updates():
8181
try:
8282
updates = json.loads(res)
8383
except Exception:
84-
raise Exception(f"Latest updates returned invalid JSON: {res}")
84+
raise Exception(
85+
f"Latest updates returned invalid JSON: {res}"
86+
)
8587
if index_error := f95zone.check_error(updates, logger):
8688
raise Exception(index_error)
8789

@@ -94,7 +96,9 @@ async def watch_updates():
9496
for update in updates["msg"]["data"]:
9597
name = cache.NAME_FORMAT.format(id=update["thread_id"])
9698
names.append(name)
97-
cached_data.hmget(name, "version", cache.HASHED_META)
99+
cached_data.hmget(
100+
name, "version", cache.HASHED_META, cache.LAST_CACHED
101+
)
98102
version = update["version"]
99103
if version == "Unknown":
100104
version = None
@@ -114,18 +118,19 @@ async def watch_updates():
114118
cached_data = await cached_data.execute()
115119

116120
assert len(names) == len(current_data) == len(cached_data)
117-
for name, (version, meta), (cached_version, cached_meta) in zip(
118-
names, current_data, cached_data
119-
):
120-
if cached_version is None:
121+
for (
122+
name,
123+
(version, meta),
124+
(cached_version, cached_meta, last_cached),
125+
) in zip(names, current_data, cached_data):
126+
if cached_version is None or not last_cached:
121127
continue
122128

123129
version_outdated = version and version != cached_version
124130
meta_outdated = meta != cached_meta
125131

126132
if version_outdated or meta_outdated:
127-
# Delete version too to avoid watch_versions() picking it up as mismatch
128-
invalidate_cache.hdel(name, cache.LAST_CACHED, "version")
133+
invalidate_cache.hdel(name, cache.LAST_CACHED)
129134
invalidate_cache.hset(name, cache.HASHED_META, meta)
130135
logger.info(
131136
f"Updates: Invalidating cache for {name}"
@@ -175,16 +180,18 @@ async def watch_versions():
175180
async with asyncio.timeout(f95zone.TIMEOUT.total):
176181
logger.info("Poll versions start")
177182

178-
names = [n async for n in cache.redis.scan_iter("thread:*", 10000, "hash")]
183+
names = [
184+
n async for n in cache.redis.scan_iter("thread:*", 10000, "hash")
185+
]
179186
invalidate_cache = cache.redis.pipeline()
180187

181188
for names_chunk in chunks(names, WATCH_VERSIONS_CHUNK_SIZE):
182189

183-
cached_versions = cache.redis.pipeline()
190+
cached_data = cache.redis.pipeline()
184191
csv = ""
185192
ids = []
186193
for name in names_chunk:
187-
cached_versions.hget(name, "version")
194+
cached_data.hmget(name, "version", cache.LAST_CACHED)
188195
id = name.split(":")[1]
189196
csv += f"{id},"
190197
ids.append(id)
@@ -195,8 +202,8 @@ async def watch_versions():
195202
f95zone.VERCHK_URL.format(threads=csv),
196203
) as req:
197204
# Await together for efficiency
198-
res, cached_versions = await asyncio.gather(
199-
req.read(), cached_versions.execute()
205+
res, cached_data = await asyncio.gather(
206+
req.read(), cached_data.execute()
200207
)
201208
except Exception as exc:
202209
if index_error := f95zone.check_error(exc, logger):
@@ -210,24 +217,28 @@ async def watch_versions():
210217
versions = json.loads(res)
211218
except Exception:
212219
raise Exception(f"Versions API returned invalid JSON: {res}")
213-
if versions.get("msg") in ("Missing threads data", "Thread not found"):
220+
if versions.get("msg") in (
221+
"Missing threads data",
222+
"Thread not found",
223+
):
214224
versions["status"] = "ok"
215225
versions["msg"] = {}
216226
if index_error := f95zone.check_error(versions, logger):
217227
raise Exception(index_error)
218228
versions = versions["msg"]
219229

220-
assert len(names_chunk) == len(ids) == len(cached_versions)
221-
for name, id, cached_version in zip(names_chunk, ids, cached_versions):
222-
if cached_version is None:
230+
assert len(names_chunk) == len(ids) == len(cached_data)
231+
for name, id, (cached_version, last_cached) in zip(
232+
names_chunk, ids, cached_data
233+
):
234+
if cached_version is None or not last_cached:
223235
continue
224236
version = versions.get(id)
225237
if not version or version == "Unknown":
226238
continue
227239

228240
if version != cached_version:
229-
# Delete version too to avoid ending up here again
230-
invalidate_cache.hdel(name, cache.LAST_CACHED, "version")
241+
invalidate_cache.hdel(name, cache.LAST_CACHED)
231242
logger.warning(
232243
f"Versions: Invalidating cache for {name}"
233244
f" ({cached_version!r} -> {version!r})"
@@ -236,7 +247,9 @@ async def watch_versions():
236247
if len(invalidate_cache):
237248
result = await invalidate_cache.execute()
238249
invalidated = sum(ret != "0" for ret in result)
239-
logger.warning(f"Versions: Invalidated cache for {invalidated} threads")
250+
logger.warning(
251+
f"Versions: Invalidated cache for {invalidated} threads"
252+
)
240253

241254
logger.info("Poll versions done")
242255

0 commit comments

Comments
 (0)