Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -390,13 +390,22 @@ POSTGRES_VCHORDRQ_PROBES=
POSTGRES_VCHORDRQ_EPSILON=1.9

### PostgreSQL Connection Retry Configuration (Network Robustness)
### Number of retry attempts (1-10, default: 3)
### Initial retry backoff in seconds (0.1-5.0, default: 0.5)
### Maximum retry backoff in seconds (backoff-60.0, default: 5.0)
### NEW DEFAULTS (v1.4.10+): Optimized for HA deployments with ~30s switchover time
### These defaults provide out-of-the-box support for PostgreSQL High Availability setups
###
### Number of retry attempts (1-100, default: 10)
### - Default 10 attempts allows ~225s total retry time (sufficient for most HA scenarios)
### - For extreme cases: increase up to 20-50
### Initial retry backoff in seconds (0.1-300.0, default: 3.0)
### - Default 3.0s provides reasonable initial delay for switchover detection
### - For faster recovery: decrease to 1.0-2.0
### Maximum retry backoff in seconds (must be >= backoff, max: 600.0, default: 30.0)
### - Default 30.0s matches typical switchover completion time
### - For longer switchovers: increase to 60-90
### Connection pool close timeout in seconds (1.0-30.0, default: 5.0)
# POSTGRES_CONNECTION_RETRIES=3
# POSTGRES_CONNECTION_RETRY_BACKOFF=0.5
# POSTGRES_CONNECTION_RETRY_BACKOFF_MAX=5.0
# POSTGRES_CONNECTION_RETRIES=10
# POSTGRES_CONNECTION_RETRY_BACKOFF=3.0
# POSTGRES_CONNECTION_RETRY_BACKOFF_MAX=30.0
# POSTGRES_POOL_CLOSE_TIMEOUT=5.0

### PostgreSQL SSL Configuration (Optional)
Expand Down
97 changes: 57 additions & 40 deletions lightrag/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,59 +266,51 @@ def parse_args() -> argparse.Namespace:
help="Enable DOCLING document loading engine (default: from env or DEFAULT)",
)

# Conditionally add binding options defined in binding_options module
# This will add command line arguments for all binding options (e.g., --ollama-embedding-num_ctx)
# and corresponding environment variables (e.g., OLLAMA_EMBEDDING_NUM_CTX)
# Conditionally add binding-specific options (Ollama, OpenAI, Azure OpenAI, Gemini)
# This registers command line arguments (e.g., --openai-llm-temperature)
# and reads corresponding environment variables (e.g., OPENAI_LLM_TEMPERATURE)

# Determine LLM binding value consistently from command line or environment
llm_binding_value = None
if "--llm-binding" in sys.argv:
try:
idx = sys.argv.index("--llm-binding")
if idx + 1 < len(sys.argv) and sys.argv[idx + 1] == "ollama":
OllamaLLMOptions.add_args(parser)
if idx + 1 < len(sys.argv) and not sys.argv[idx + 1].startswith("-"):
llm_binding_value = sys.argv[idx + 1]
except IndexError:
pass
elif os.environ.get("LLM_BINDING") == "ollama":

# Fall back to environment variable using same function as argparse default
if llm_binding_value is None:
llm_binding_value = get_env_value("LLM_BINDING", "ollama")

# Add LLM binding options based on determined value
if llm_binding_value == "ollama":
OllamaLLMOptions.add_args(parser)
elif llm_binding_value in ["openai", "azure_openai"]:
OpenAILLMOptions.add_args(parser)
elif llm_binding_value == "gemini":
GeminiLLMOptions.add_args(parser)

# Determine embedding binding value consistently from command line or environment
embedding_binding_value = None
if "--embedding-binding" in sys.argv:
try:
idx = sys.argv.index("--embedding-binding")
if idx + 1 < len(sys.argv):
if sys.argv[idx + 1] == "ollama":
OllamaEmbeddingOptions.add_args(parser)
elif sys.argv[idx + 1] == "gemini":
GeminiEmbeddingOptions.add_args(parser)
if idx + 1 < len(sys.argv) and not sys.argv[idx + 1].startswith("-"):
embedding_binding_value = sys.argv[idx + 1]
except IndexError:
pass
else:
env_embedding_binding = os.environ.get("EMBEDDING_BINDING")
if env_embedding_binding == "ollama":
OllamaEmbeddingOptions.add_args(parser)
elif env_embedding_binding == "gemini":
GeminiEmbeddingOptions.add_args(parser)

# Add OpenAI LLM options when llm-binding is openai or azure_openai
if "--llm-binding" in sys.argv:
try:
idx = sys.argv.index("--llm-binding")
if idx + 1 < len(sys.argv) and sys.argv[idx + 1] in [
"openai",
"azure_openai",
]:
OpenAILLMOptions.add_args(parser)
except IndexError:
pass
elif os.environ.get("LLM_BINDING") in ["openai", "azure_openai"]:
OpenAILLMOptions.add_args(parser)
# Fall back to environment variable using same function as argparse default
if embedding_binding_value is None:
embedding_binding_value = get_env_value("EMBEDDING_BINDING", "ollama")

if "--llm-binding" in sys.argv:
try:
idx = sys.argv.index("--llm-binding")
if idx + 1 < len(sys.argv) and sys.argv[idx + 1] == "gemini":
GeminiLLMOptions.add_args(parser)
except IndexError:
pass
elif os.environ.get("LLM_BINDING") == "gemini":
GeminiLLMOptions.add_args(parser)
# Add embedding binding options based on determined value
if embedding_binding_value == "ollama":
OllamaEmbeddingOptions.add_args(parser)
elif embedding_binding_value == "gemini":
GeminiEmbeddingOptions.add_args(parser)

args = parser.parse_args()

Expand Down Expand Up @@ -542,19 +534,44 @@ class _GlobalArgsProxy:

This maintains backward compatibility with existing code while
allowing programmatic control over initialization timing.

The proxy fully delegates to the underlying argparse.Namespace,
including support for vars() calls which is used by binding_options
to extract provider-specific configuration options.
"""

def __getattr__(self, name):
def __getattribute__(self, name):
"""Override attribute access to support vars() and regular attribute access.

This method intercepts __dict__ access (used by vars()) and delegates
to the underlying _global_args namespace, ensuring binding options
can be properly extracted.
"""
global _initialized, _global_args

# Handle __dict__ access for vars() support
if name == "__dict__":
if not _initialized:
initialize_config()
return vars(_global_args)

# Handle class-level attributes that should come from the proxy itself
if name in ("__class__", "__repr__", "__getattribute__", "__setattr__"):
return object.__getattribute__(self, name)

# Delegate all other attribute access to the underlying namespace
if not _initialized:
initialize_config()
return getattr(_global_args, name)

def __setattr__(self, name, value):
global _initialized, _global_args
if not _initialized:
initialize_config()
setattr(_global_args, name, value)

def __repr__(self):
global _initialized, _global_args
if not _initialized:
return "<GlobalArgsProxy: Not initialized>"
return repr(_global_args)
Expand Down
30 changes: 29 additions & 1 deletion lightrag/api/routers/document_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2101,13 +2101,41 @@ async def upload_to_input_dir(
uploaded file is of a supported type, saves it in the specified input directory,
indexes it for retrieval, and returns a success status with relevant details.

**Duplicate Detection Behavior:**

This endpoint handles two types of duplicate scenarios differently:

1. **Filename Duplicate (Synchronous Detection)**:
- Detected immediately before file processing
- Returns `status="duplicated"` with the existing document's track_id
- Two cases:
- If filename exists in document storage: returns existing track_id
- If filename exists in file system only: returns empty track_id ("")

2. **Content Duplicate (Asynchronous Detection)**:
- Detected during background processing after content extraction
- Returns `status="success"` with a new track_id immediately
- The duplicate is detected later when processing the file content
- Use `/documents/track_status/{track_id}` to check the final result:
- Document will have `status="FAILED"`
- `error_msg` contains "Content already exists. Original doc_id: xxx"
- `metadata.is_duplicate=true` with reference to original document
- `metadata.original_doc_id` points to the existing document
- `metadata.original_track_id` shows the original upload's track_id

**Why Different Behavior?**
- Filename check is fast (simple lookup), done synchronously
- Content extraction is expensive (PDF/DOCX parsing), done asynchronously
- This design prevents blocking the client during expensive operations

Args:
background_tasks: FastAPI BackgroundTasks for async processing
file (UploadFile): The file to be uploaded. It must have an allowed extension.

Returns:
InsertResponse: A response object containing the upload status and a message.
status can be "success", "duplicated", or error is thrown.
- status="success": File accepted and queued for processing
- status="duplicated": Filename already exists (see track_id for existing document)

Raises:
HTTPException: If the file type is not supported (400) or other errors occur (500).
Expand Down
12 changes: 6 additions & 6 deletions lightrag/kg/postgres_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1619,34 +1619,34 @@ def get_config() -> dict[str, Any]:
),
# Connection retry configuration
"connection_retry_attempts": min(
10,
100, # Increased from 10 to 100 for long-running operations
int(
os.environ.get(
"POSTGRES_CONNECTION_RETRIES",
config.get("postgres", "connection_retries", fallback=3),
config.get("postgres", "connection_retries", fallback=10),
)
),
),
"connection_retry_backoff": min(
5.0,
300.0, # Increased from 5.0 to 300.0 (5 minutes) for PG switchover scenarios
float(
os.environ.get(
"POSTGRES_CONNECTION_RETRY_BACKOFF",
config.get(
"postgres", "connection_retry_backoff", fallback=0.5
"postgres", "connection_retry_backoff", fallback=3.0
),
)
),
),
"connection_retry_backoff_max": min(
60.0,
600.0, # Increased from 60.0 to 600.0 (10 minutes) for PG switchover scenarios
float(
os.environ.get(
"POSTGRES_CONNECTION_RETRY_BACKOFF_MAX",
config.get(
"postgres",
"connection_retry_backoff_max",
fallback=5.0,
fallback=30.0,
),
)
),
Expand Down
41 changes: 35 additions & 6 deletions lightrag/lightrag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1354,17 +1354,46 @@ async def apipeline_enqueue_documents(
# Exclude IDs of documents that are already enqueued
unique_new_doc_ids = await self.doc_status.filter_keys(all_new_doc_ids)

# Log ignored document IDs (documents that were filtered out because they already exist)
# Handle duplicate documents - create trackable records with current track_id
ignored_ids = list(all_new_doc_ids - unique_new_doc_ids)
if ignored_ids:
duplicate_docs: dict[str, Any] = {}
for doc_id in ignored_ids:
file_path = new_docs.get(doc_id, {}).get("file_path", "unknown_source")
logger.warning(
f"Ignoring document ID (already exists): {doc_id} ({file_path})"
logger.warning(f"Duplicate document detected: {doc_id} ({file_path})")

# Get existing document info for reference
existing_doc = await self.doc_status.get_by_id(doc_id)
existing_status = (
existing_doc.get("status", "unknown") if existing_doc else "unknown"
)
existing_track_id = (
existing_doc.get("track_id", "") if existing_doc else ""
)
if len(ignored_ids) > 3:
logger.warning(
f"Total Ignoring {len(ignored_ids)} document IDs that already exist in storage"

# Create a new record with unique ID for this duplicate attempt
dup_record_id = compute_mdhash_id(f"{doc_id}-{track_id}", prefix="dup-")
duplicate_docs[dup_record_id] = {
"status": DocStatus.FAILED,
"content_summary": f"[DUPLICATE] Original document: {doc_id}",
"content_length": new_docs.get(doc_id, {}).get("content_length", 0),
"created_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
"file_path": file_path,
"track_id": track_id, # Use current track_id for tracking
"error_msg": f"Content already exists. Original doc_id: {doc_id}, Status: {existing_status}",
"metadata": {
"is_duplicate": True,
"original_doc_id": doc_id,
"original_track_id": existing_track_id,
},
}

# Store duplicate records in doc_status
if duplicate_docs:
await self.doc_status.upsert(duplicate_docs)
logger.info(
f"Created {len(duplicate_docs)} duplicate document records with track_id: {track_id}"
)

# Filter new_docs to only include documents with unique IDs
Expand Down
20 changes: 12 additions & 8 deletions lightrag_webui/src/features/DocumentManager.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ export default function DocumentManager() {
// Utility function to create timeout wrapper for API calls
const withTimeout = useCallback((
promise: Promise<any>,
timeoutMs: number = 30000,
timeoutMs: number = 30000, // Default 30s timeout for normal operations
errorMsg: string = 'Request timeout'
): Promise<any> => {
const timeoutPromise = new Promise((_, reject) => {
Expand Down Expand Up @@ -676,7 +676,8 @@ export default function DocumentManager() {
// Intelligent refresh function: handles all boundary cases
const handleIntelligentRefresh = useCallback(async (
targetPage?: number, // Optional target page, defaults to current page
resetToFirst?: boolean // Whether to force reset to first page
resetToFirst?: boolean, // Whether to force reset to first page
customTimeout?: number // Optional custom timeout in milliseconds (uses withTimeout default if not provided)
) => {
try {
if (!isMountedRef.current) return;
Expand All @@ -694,10 +695,10 @@ export default function DocumentManager() {
sort_direction: sortDirection
};

// Use timeout wrapper for the API call
// Use timeout wrapper for the API call (uses customTimeout if provided, otherwise withTimeout default)
const response = await withTimeout(
getDocumentsPaginated(request),
30000, // 30 second timeout
customTimeout, // Pass undefined to use default 30s, or explicit timeout for special cases
'Document fetch timeout'
);

Expand All @@ -717,7 +718,7 @@ export default function DocumentManager() {

const lastPageResponse = await withTimeout(
getDocumentsPaginated(lastPageRequest),
30000,
customTimeout, // Use same timeout for consistency
'Document fetch timeout'
);

Expand Down Expand Up @@ -847,7 +848,10 @@ export default function DocumentManager() {
// Reset health check timer with 1 second delay to avoid race condition
useBackendState.getState().resetHealthCheckTimerDelayed(1000);

// Start fast refresh with 2-second interval immediately after scan
// Perform immediate refresh with 90s timeout after scan (tolerates PostgreSQL switchover)
await handleIntelligentRefresh(undefined, false, 90000);

// Start fast refresh with 2-second interval after initial refresh
startPollingInterval(2000);

// Set recovery timer to restore normal polling interval after 15 seconds
Expand All @@ -865,7 +869,7 @@ export default function DocumentManager() {
toast.error(t('documentPanel.documentManager.errors.scanFailed', { error: errorMessage(err) }));
}
}
}, [t, startPollingInterval, currentTab, health, statusCounts])
}, [t, startPollingInterval, currentTab, health, statusCounts, handleIntelligentRefresh])

// Handle page size change - update state and save to store
const handlePageSizeChange = useCallback((newPageSize: number) => {
Expand Down Expand Up @@ -1184,7 +1188,7 @@ export default function DocumentManager() {
) : !isSelectionMode ? (
<ClearDocumentsDialog onDocumentsCleared={handleDocumentsCleared} />
) : null}
<UploadDocumentsDialog onDocumentsUploaded={fetchDocuments} />
<UploadDocumentsDialog onDocumentsUploaded={() => handleIntelligentRefresh(undefined, false, 120000)} />
<PipelineStatusDialog
open={showPipelineStatus}
onOpenChange={setShowPipelineStatus}
Expand Down
Loading