Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions deepdoc/parser/mineru_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ def _transfer_to_sections(self, outputs: list[dict[str, Any]], parse_method: str
for output in outputs:
match output["type"]:
case MinerUContentType.TEXT:
section = output["text"]
section = output.get("text", "")
case MinerUContentType.TABLE:
section = output.get("table_body", "") + "\n".join(output.get("table_caption", [])) + "\n".join(
output.get("table_footnote", []))
Expand All @@ -521,13 +521,13 @@ def _transfer_to_sections(self, outputs: list[dict[str, Any]], parse_method: str
section = "".join(output.get("image_caption", [])) + "\n" + "".join(
output.get("image_footnote", []))
case MinerUContentType.EQUATION:
section = output["text"]
section = output.get("text", "")
case MinerUContentType.CODE:
section = output["code_body"] + "\n".join(output.get("code_caption", []))
section = output.get("code_body", "") + "\n".join(output.get("code_caption", []))
case MinerUContentType.LIST:
section = "\n".join(output.get("list_items", []))
case MinerUContentType.DISCARDED:
pass
continue # Skip discarded blocks entirely

if section and parse_method == "manual":
sections.append((section, output["type"], self._line_tag(output)))
Expand Down
2 changes: 1 addition & 1 deletion rag/app/naive.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
"parser_config", {
"chunk_token_num": 512, "delimiter": "\n!?。;!?", "layout_recognize": "DeepDOC", "analyze_hyperlink": True})

child_deli = parser_config.get("children_delimiter", "").encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8')
child_deli = (parser_config.get("children_delimiter") or "").encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8')
cust_child_deli = re.findall(r"`([^`]+)`", child_deli)
child_deli = "|".join(re.sub(r"`([^`]+)`", "", child_deli))
if cust_child_deli:
Expand Down
13 changes: 13 additions & 0 deletions rag/svr/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,19 @@ async def task_manager():


async def main():
# Stagger executor startup to prevent connection storm to Infinity
# Extract worker number from CONSUMER_NAME (e.g., "task_executor_abc123_5" -> 5)
try:
worker_num = int(CONSUMER_NAME.rsplit("_", 1)[-1])
# Add random delay: base delay + worker_num * 2.0s + random jitter
# This spreads out connection attempts over several seconds
startup_delay = worker_num * 2.0 + random.uniform(0, 0.5)
if startup_delay > 0:
logging.info(f"Staggering startup by {startup_delay:.2f}s to prevent connection storm")
await asyncio.sleep(startup_delay)
except (ValueError, IndexError):
pass # Non-standard consumer name, skip delay

logging.info(r"""
____ __ _
/ _/___ ____ ____ _____/ /_(_)___ ____ ________ ______ _____ _____
Expand Down
2 changes: 1 addition & 1 deletion rag/utils/infinity_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def __init__(self):
logger.info(f"Use Infinity {infinity_uri} as the doc engine.")
for _ in range(24):
try:
connPool = ConnectionPool(infinity_uri, max_size=32)
connPool = ConnectionPool(infinity_uri, max_size=4)
inf_conn = connPool.get_conn()
res = inf_conn.show_current_node()
if res.error_code == ErrorCode.OK and res.server_status in ["started", "alive"]:
Expand Down