Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ca33346
feat(table-parser): add column-level vectorize/metadata/both roles to…
Mar 15, 2026
2fdd509
feat(table-parser): add parser config schema, column names passthroug…
Mar 15, 2026
60a8137
feat(table-parser): add auto/manual mode toggle for column role confi…
Mar 15, 2026
b62499a
fix(table-parser): merge KB-level column role config into doc parser_…
Mar 19, 2026
5a1a40d
feat(table-parser): integrate DocMetadataService for metadata-role co…
Mar 19, 2026
148f2af
merge: resolve validation_utils.py conflict with upstream main
Mar 19, 2026
c80dcc2
fix(table-parser): store raw metadata values alongside tokenized ES f…
Mar 24, 2026
2a69c36
Update rag/svr/task_executor.py
ahmadintisar Mar 24, 2026
d4dc5f6
fix(task_executor): reload KB parser_config field_map when task snaps…
Mar 24, 2026
66b7673
fix(task_executor): reload KB field_map for table metadata ES keys; a…
Mar 24, 2026
fb67de1
Downgraded logs from logging.info to logging.debug
Mar 24, 2026
1fda78e
fix(table): merge field_map and table_column_names across Excel sheet…
Mar 24, 2026
c371058
Helper methods, and unit tests covered
Mar 24, 2026
f0c2733
Merge branch 'main' into feature/table-parser-column-roles
JinHai-CN Mar 25, 2026
8c4f7a6
Merge branch 'main' into feature/table-parser-column-roles
yingfeng Mar 25, 2026
707e76c
fix: restore upstream ext field on ParserConfig removed during confli…
Apr 1, 2026
804e5d2
test(table-parser): add 8 integration tests for chunk() with column r…
Apr 1, 2026
b7b5d0e
fix(test): mock deepdoc/OCR imports to prevent ONNX model load failur…
Apr 1, 2026
00275c3
Merge branch 'infiniflow:main' into feature/table-parser-column-roles
ahmadintisar Apr 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/utils/validation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ class AutoMetadataConfig(Base):
fields: Annotated[list[AutoMetadataField], Field(default_factory=list)]


TableColumnRole = Literal["vectorize", "metadata", "both"]


class ParserConfig(Base):
auto_keywords: Annotated[int, Field(default=0, ge=0, le=32)]
auto_questions: Annotated[int, Field(default=0, ge=0, le=10)]
Expand All @@ -389,6 +392,13 @@ class ParserConfig(Base):
task_page_size: Annotated[int | None, Field(default=None, ge=1)]
pages: Annotated[list[list[int]] | None, Field(default=None)]
ext: Annotated[dict, Field(default={})]
# Table parser: column name -> "vectorize" | "metadata" | "both". Absence => all columns "both".
# Table parser: "auto" = all columns both (default), "manual" = use table_column_roles. None → treated as "auto".
table_column_mode: Annotated[Literal["auto", "manual"] | None, Field(default=None)]
# Table parser: column name -> "vectorize" | "metadata" | "both". Used only when table_column_mode == "manual".
table_column_roles: Annotated[dict[str, TableColumnRole] | None, Field(default=None)]
# Table parser: list of column names (set by backend after first parse; used by frontend for role selector).
table_column_names: Annotated[list[str] | None, Field(default=None)]


class CreateDatasetReq(Base):
Expand Down
137 changes: 109 additions & 28 deletions rag/app/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from deepdoc.parser import ExcelParser
from common import settings

logger = logging.getLogger(__name__)

class Excel(ExcelParser):
def __call__(self, fnm, binary=None, from_page=0, to_page=10000000000, callback=None, **kwargs):
Expand Down Expand Up @@ -371,6 +372,11 @@ def chunk(filename, binary=None, from_page=0, to_page=10000000000, lang="Chinese

Every row in table will be treated as a chunk.
"""
_pc0 = kwargs.get("parser_config") or {}
logger.debug(f"[TABLE_PARSER_DEBUG] parser_config keys: {list(_pc0.keys())}")
logger.debug(f"[TABLE_PARSER_DEBUG] table_column_mode: {_pc0.get('table_column_mode')}")
logger.debug(f"[TABLE_PARSER_DEBUG] table_column_roles: {_pc0.get('table_column_roles')}")

tbls = []
is_english = lang.lower() == "english"
if re.search(r"\.xlsx?$", filename, re.IGNORECASE):
Expand Down Expand Up @@ -434,6 +440,19 @@ def chunk(filename, binary=None, from_page=0, to_page=10000000000, lang="Chinese
# Field type suffixes for database columns
# Maps data types to their database field suffixes
fields_map = {"text": "_tks", "int": "_long", "keyword": "_kwd", "float": "_flt", "datetime": "_dt", "bool": "_kwd"}
parser_config = kwargs.get("parser_config") or {}
if parser_config.get("table_column_mode") == "manual":
column_roles = parser_config.get("table_column_roles") or {}
else:
column_roles = {}
logger.debug(
f"[TABLE_PARSER_DEBUG] effective table_column_mode={parser_config.get('table_column_mode')!r}, "
f"column_roles keys={list(column_roles.keys())}"
)

# Pass 1: infer columns per sheet (multi-sheet Excel => multiple DataFrames). Merge field_map and
# table_column_names, then update KB once so the UI role selector sees all columns, not only the last sheet.
sheet_specs = []
for df in dfs:
for n in ["id", "_id", "index", "idx"]:
if n in df.columns:
Expand All @@ -456,50 +475,112 @@ def chunk(filename, binary=None, from_page=0, to_page=10000000000, lang="Chinese
txts.extend([str(c) for c in cln if c])
clmns_map = [(py_clmns[i].lower() + fields_map[clmn_tys[i]], str(clmns[i]).replace("_", " ")) for i in
range(len(clmns))]
# For Infinity/OceanBase: Use original column names as keys since they're stored in chunk_data JSON
# For ES/OS: Use full field names with type suffixes (e.g., url_kwd, body_tks)
# field_map: only columns stored in chunk_data (metadata or both) — used for retrieval/SQL
stored_indices = [
i for i in range(len(clmns))
if column_roles.get(clmns[i], "both") in ("metadata", "both")
]
if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
# For Infinity/OceanBase: key = original column name, value = display name
field_map = {py_clmns[i].lower(): str(clmns[i]).replace("_", " ") for i in range(len(clmns))}
field_map = {
py_clmns[i].lower(): str(clmns[i]).replace("_", " ")
for i in stored_indices
}
else:
# For ES/OS: key = typed field name, value = display name
field_map = {k: v for k, v in clmns_map}
logging.debug(f"Field map: {field_map}")
KnowledgebaseService.update_parser_config(kwargs["kb_id"], {"field_map": field_map})
field_map = {
clmns_map[i][0]: clmns_map[i][1]
for i in stored_indices
}
logging.debug(f"Field map (sheet): {field_map}")
sheet_specs.append(
{
"df": df,
"clmns": clmns,
"clmn_tys": clmn_tys,
"clmns_map": clmns_map,
"py_clmns": py_clmns,
"field_map": field_map,
}
)

eng = lang.lower() == "english" # is_english(txts)
merged_field_map = {}
merged_table_column_names = []
seen_col = set()
for spec in sheet_specs:
merged_field_map.update(spec["field_map"])
for col in spec["clmns"]:
if col not in seen_col:
seen_col.add(col)
merged_table_column_names.append(col)

logging.debug(f"Field map (merged across sheets): {merged_field_map}")
KnowledgebaseService.update_parser_config(
kwargs["kb_id"],
{"field_map": merged_field_map, "table_column_names": merged_table_column_names},
)

eng = lang.lower() == "english" # is_english(txts)
for spec in sheet_specs:
df = spec["df"]
clmns = spec["clmns"]
clmn_tys = spec["clmn_tys"]
clmns_map = spec["clmns_map"]
py_clmns = spec["py_clmns"]
_debug_row_idx = 0
for ii, row in df.iterrows():
_debug_row_idx += 1
d = {"docnm_kwd": filename, "title_tks": rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", filename))}
row_fields = []
data_json = {} # For Infinity: Store all columns in a JSON object
text_fields = [] # vectorize + both -> content_with_weight
stored = {} # metadata + both -> chunk_data (Infinity) or typed fields (ES)
for j in range(len(clmns)):
if row[clmns[j]] is None:
continue
if not str(row[clmns[j]]):
continue
if not isinstance(row[clmns[j]], pd.Series) and pd.isna(row[clmns[j]]):
continue
# For Infinity/OceanBase: Store in chunk_data JSON column
# For Elasticsearch/OpenSearch: Store as individual fields with type suffixes
if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
data_json[str(clmns[j])] = row[clmns[j]]
else:
fld = clmns_map[j][0]
d[fld] = row[clmns[j]] if clmn_tys[j] != "text" else rag_tokenizer.tokenize(row[clmns[j]])
row_fields.append((clmns[j], row[clmns[j]]))
if not row_fields:
col_name = clmns[j]
role = column_roles.get(col_name, "both")
if _debug_row_idx == 1:
logger.debug(f"[TABLE_PARSER_DEBUG] Column '{col_name}' -> role '{role}'")
if role in ("vectorize", "both"):
text_fields.append((col_name, row[col_name]))
if role in ("metadata", "both"):
if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
stored[str(col_name)] = row[col_name]
else:
fld = clmns_map[j][0]
if clmn_tys[j] != "text":
stored[fld] = row[col_name]
else:
cell = row[col_name]
stored[fld] = rag_tokenizer.tokenize(cell)
raw_s = str(cell).strip() if cell is not None else ""
if raw_s:
stored[f"{py_clmns[j].lower()}_raw"] = raw_s
if not text_fields and not stored:
continue
# Add the data JSON field to the document (for Infinity/OceanBase)
if settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE:
d["chunk_data"] = data_json
# Format as a structured text for better LLM comprehension
# Format each field as "- Field Name: Value" on separate lines
formatted_text = "\n".join([f"- {field}: {value}" for field, value in row_fields])
if stored:
d["chunk_data"] = stored
else:
d.update(stored)
formatted_text = "\n".join([f"- {field}: {value}" for field, value in text_fields]) if text_fields else ""
tokenize(d, formatted_text, eng)
if _debug_row_idx == 1:
logger.debug(
f"[TABLE_PARSER_DEBUG] Chunk content_with_weight length: {len(d.get('content_with_weight', '') or '')}"
)
_cd = d.get("chunk_data")
logger.debug(
f"[TABLE_PARSER_DEBUG] Chunk chunk_data keys: {list(_cd.keys()) if isinstance(_cd, dict) else 'N/A'}"
)
if not (settings.DOC_ENGINE_INFINITY or settings.DOC_ENGINE_OCEANBASE):
_extra = [k for k in d if k not in ("docnm_kwd", "title_tks", "content_with_weight", "content_ltks", "content_sm_ltks")]
logger.debug(f"[TABLE_PARSER_DEBUG] Chunk ES extra field keys (sample): {_extra[:20]}")
res.append(d)
if tbls:
doc = {"docnm_kwd": filename, "title_tks": rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", filename))}
res.extend(tokenize_table(tbls, doc, is_english))
if tbls:
doc = {"docnm_kwd": filename, "title_tks": rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", filename))}
res.extend(tokenize_table(tbls, doc, is_english))
callback(0.35, "")

return res
Expand Down
53 changes: 52 additions & 1 deletion rag/svr/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@
from common.exceptions import TaskCanceledException
from common import settings
from common.constants import PAGERANK_FLD, TAG_FLD, SVR_CONSUMER_GROUP_NAME
from rag.utils.table_es_metadata import aggregate_table_manual_doc_metadata, merge_table_parser_config_from_kb

BATCH_SIZE = 64


FACTORY = {
"general": naive,
ParserType.NAIVE.value: naive,
Expand Down Expand Up @@ -268,6 +270,16 @@ async def build_chunks(task, progress_callback):
logging.exception("Chunking {}/{} got exception".format(task["location"], task["name"]))
raise

# Table parser column roles / mode are stored on the dataset (KB) parser_config;
# chunk tasks carry document-level parser_config only — merge KB keys so manual roles apply.
parser_config_for_chunk = merge_table_parser_config_from_kb(task)
if task.get("parser_id", "").lower() == "table" and task.get("kb_parser_config"):
logging.debug(
"[TASK_EXECUTOR_DEBUG] table parser: merged KB keys into parser_config for chunk; "
f"mode={parser_config_for_chunk.get('table_column_mode')}, "
f"roles_keys={list((parser_config_for_chunk.get('table_column_roles') or {}).keys())}"
)

try:
async with chunk_limiter:
cks = await thread_pool_exec(
Expand All @@ -279,7 +291,7 @@ async def build_chunks(task, progress_callback):
lang=task["language"],
callback=progress_callback,
kb_id=task["kb_id"],
parser_config=task["parser_config"],
parser_config=parser_config_for_chunk,
tenant_id=task["tenant_id"],
)
logging.info("Chunking({}) {}/{} done".format(timer() - st, task["location"], task["name"]))
Expand Down Expand Up @@ -1177,6 +1189,45 @@ async def _maybe_insert_chunks(_chunks):

DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0)

# Table parser (manual): push metadata/both column values to document-level metadata for UI / chat filters
if task.get("parser_id", "").lower() == "table":
eff_pc = merge_table_parser_config_from_kb(task)
logging.debug(
f"[TABLE_META_DEBUG] table post-index: table_column_mode={eff_pc.get('table_column_mode')!r}"
)
if eff_pc.get("table_column_mode") == "manual":
try:
agg = aggregate_table_manual_doc_metadata(chunks, task)
logging.debug(f"[TABLE_META_DEBUG] aggregated metadata: {agg}")
if not agg:
logging.debug(
"[TABLE_META_DEBUG] skip update_document_metadata: empty aggregate (see logs above)"
)
if agg:
existing = DocMetadataService.get_document_metadata(task_doc_id)
existing = existing if isinstance(existing, dict) else {}
merged = update_metadata_to(dict(existing), agg)
logging.debug(
f"[TABLE_META_DEBUG] calling update_document_metadata for doc_id={task_doc_id}, "
f"meta_fields keys={list(merged.keys())}"
)
try:
DocMetadataService.update_document_metadata(task_doc_id, merged)
logging.debug("[TABLE_META_DEBUG] update_document_metadata succeeded")
except Exception as ue:
logging.error(
"update_document_metadata failed (table parser, doc_id=%s): %s",
task_doc_id,
ue,
exc_info=True,
)
except Exception as e:
logging.exception(
"Table parser document metadata aggregation failed (doc_id=%s): %s",
task_doc_id,
e,
)

progress_callback(msg="Indexing done ({:.2f}s).".format(timer() - start_ts))

if toc_thread:
Expand Down
Loading
Loading