Skip to content

Commit 79b092f

Browse files
concertdictateuser210
authored andcommitted
task executor issues (infiniflow#12006)
### What problem does this PR solve? **Fixes infiniflow#8706** - `InfinityException: TOO_MANY_CONNECTIONS` when running multiple task executor workers ### Problem Description When running RAGFlow with 8-16 task executor workers, most workers fail to start properly. Checking logs revealed that workers were stuck/hanging during Infinity connection initialization - only 1-2 workers would successfully register in Redis while the rest remained blocked. ### Root Cause The Infinity SDK `ConnectionPool` pre-allocates all connections in `__init__`. With the default `max_size=32` and multiple workers (e.g., 16), this creates 16×32=512 connections immediately on startup, exceeding Infinity's default 128 connection limit. Workers hang while waiting for connections that can never be established. ### Changes 1. **Prevent Infinity connection storm** (`rag/utils/infinity_conn.py`, `rag/svr/task_executor.py`) - Reduced ConnectionPool `max_size` from 32 to 4 (sufficient since operations are synchronous) - Added staggered startup delay (2s per worker) to spread connection initialization 2. **Handle None children_delimiter** (`rag/app/naive.py`) - Use `or ""` to handle explicitly set None values from parser config 3. **MinerU parser robustness** (`deepdoc/parser/mineru_parser.py`) - Use `.get()` for optional output fields that may be missing - Fix DISCARDED block handling: change `pass` to `continue` to skip discarded blocks entirely ### Why `max_size=4` is sufficient | Workers | Pool Size | Total Connections | Infinity Limit | |---------|-----------|-------------------|----------------| | 16 | 32 | 512 | 128 ❌ | | 16 | 4 | 64 | 128 ✅ | | 32 | 4 | 128 | 128 ✅ | - All RAGFlow operations are synchronous: `get_conn()` → operation → `release_conn()` - No parallel `docStoreConn` operations in the codebase - Maximum 1-2 concurrent connections needed per worker; 4 provides safety margin ### MinerU DISCARDED block bug When MinerU returns blocks with `type: "discarded"` (headers, footers, watermarks, page numbers, artifacts), the previous code used `pass` which left the `section` variable undefined, causing: - **UnboundLocalError** if DISCARDED is the first block - **Duplicate content** if DISCARDED follows another block (stale value from previous iteration) **Root cause confirmed via MinerU source code:** From [`mineru/utils/enum_class.py`](https://github.com/opendatalab/MinerU/blob/main/mineru/utils/enum_class.py#L14): ```python class BlockType: DISCARDED = 'discarded' # VLM 2.5+ also has: HEADER, FOOTER, PAGE_NUMBER, ASIDE_TEXT, PAGE_FOOTNOTE ``` Per [MinerU documentation](https://opendatalab.github.io/MinerU/reference/output_files/), discarded blocks contain content that should be filtered out for clean text extraction. **Fix:** Changed `pass` to `continue` to skip discarded blocks entirely. ### Testing - Verified all 16 workers now register successfully in Redis - All workers heartbeating correctly - Document parsing works as expected - MinerU parsing with DISCARDED blocks no longer crashes ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --------- Co-authored-by: user210 <user210@rt>
1 parent f0a296d commit 79b092f

File tree

4 files changed

+19
-6
lines changed

4 files changed

+19
-6
lines changed

deepdoc/parser/mineru_parser.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ def _transfer_to_sections(self, outputs: list[dict[str, Any]], parse_method: str
511511
for output in outputs:
512512
match output["type"]:
513513
case MinerUContentType.TEXT:
514-
section = output["text"]
514+
section = output.get("text", "")
515515
case MinerUContentType.TABLE:
516516
section = output.get("table_body", "") + "\n".join(output.get("table_caption", [])) + "\n".join(
517517
output.get("table_footnote", []))
@@ -521,13 +521,13 @@ def _transfer_to_sections(self, outputs: list[dict[str, Any]], parse_method: str
521521
section = "".join(output.get("image_caption", [])) + "\n" + "".join(
522522
output.get("image_footnote", []))
523523
case MinerUContentType.EQUATION:
524-
section = output["text"]
524+
section = output.get("text", "")
525525
case MinerUContentType.CODE:
526-
section = output["code_body"] + "\n".join(output.get("code_caption", []))
526+
section = output.get("code_body", "") + "\n".join(output.get("code_caption", []))
527527
case MinerUContentType.LIST:
528528
section = "\n".join(output.get("list_items", []))
529529
case MinerUContentType.DISCARDED:
530-
pass
530+
continue # Skip discarded blocks entirely
531531

532532
if section and parse_method == "manual":
533533
sections.append((section, output["type"], self._line_tag(output)))

rag/app/naive.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
651651
"parser_config", {
652652
"chunk_token_num": 512, "delimiter": "\n!?。;!?", "layout_recognize": "DeepDOC", "analyze_hyperlink": True})
653653

654-
child_deli = parser_config.get("children_delimiter", "").encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8')
654+
child_deli = (parser_config.get("children_delimiter") or "").encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8')
655655
cust_child_deli = re.findall(r"`([^`]+)`", child_deli)
656656
child_deli = "|".join(re.sub(r"`([^`]+)`", "", child_deli))
657657
if cust_child_deli:

rag/svr/task_executor.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,6 +1165,19 @@ async def task_manager():
11651165

11661166

11671167
async def main():
1168+
# Stagger executor startup to prevent connection storm to Infinity
1169+
# Extract worker number from CONSUMER_NAME (e.g., "task_executor_abc123_5" -> 5)
1170+
try:
1171+
worker_num = int(CONSUMER_NAME.rsplit("_", 1)[-1])
1172+
# Add random delay: base delay + worker_num * 2.0s + random jitter
1173+
# This spreads out connection attempts over several seconds
1174+
startup_delay = worker_num * 2.0 + random.uniform(0, 0.5)
1175+
if startup_delay > 0:
1176+
logging.info(f"Staggering startup by {startup_delay:.2f}s to prevent connection storm")
1177+
await asyncio.sleep(startup_delay)
1178+
except (ValueError, IndexError):
1179+
pass # Non-standard consumer name, skip delay
1180+
11681181
logging.info(r"""
11691182
____ __ _
11701183
/ _/___ ____ ____ _____/ /_(_)___ ____ ________ ______ _____ _____

rag/utils/infinity_conn.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def __init__(self):
183183
logger.info(f"Use Infinity {infinity_uri} as the doc engine.")
184184
for _ in range(24):
185185
try:
186-
connPool = ConnectionPool(infinity_uri, max_size=32)
186+
connPool = ConnectionPool(infinity_uri, max_size=4)
187187
inf_conn = connPool.get_conn()
188188
res = inf_conn.show_current_node()
189189
if res.error_code == ErrorCode.OK and res.server_status in ["started", "alive"]:

0 commit comments

Comments
 (0)