Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing status #4366

Draft
wants to merge 8 commits into
base: improve-index-attempt-display
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion backend/onyx/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,6 @@ class DocumentRetrievalFeedback(Base):
feedback: Mapped[SearchFeedbackType | None] = mapped_column(
Enum(SearchFeedbackType, native_enum=False), nullable=True
)

chat_message: Mapped[ChatMessage] = relationship(
"ChatMessage",
back_populates="document_feedbacks",
Expand Down
31 changes: 31 additions & 0 deletions backend/onyx/db/sync_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,37 @@
logger = setup_logger()


def fetch_paginated_sync_records(
db_session: Session,
entity_id: int,
sync_type: SyncType,
page_num: int,
page_size: int,
) -> tuple[list[SyncRecord], int]:
total_count = (
db_session.query(SyncRecord)
.filter(
SyncRecord.entity_id == entity_id,
SyncRecord.sync_type == sync_type,
)
.count()
)

sync_records = (
db_session.query(SyncRecord)
.filter(
SyncRecord.entity_id == entity_id,
SyncRecord.sync_type == sync_type,
)
.order_by(SyncRecord.sync_start_time.desc())
.offset(page_num * page_size)
.limit(page_size)
.all()
)

return sync_records, total_count


def insert_sync_record(
db_session: Session,
entity_id: int,
Expand Down
83 changes: 83 additions & 0 deletions backend/onyx/server/documents/cc_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session

from ee.onyx.db.user_group import fetch_user_group
from onyx.auth.users import current_curator_or_admin_user
from onyx.auth.users import current_user
from onyx.background.celery.celery_utils import get_deletion_attempt_snapshot
Expand Down Expand Up @@ -41,6 +42,7 @@
from onyx.db.engine import get_session
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import SyncType
from onyx.db.index_attempt import count_index_attempt_errors_for_cc_pair
from onyx.db.index_attempt import count_index_attempts_for_connector
from onyx.db.index_attempt import get_index_attempt_errors_for_cc_pair
Expand All @@ -50,6 +52,7 @@
from onyx.db.models import User
from onyx.db.search_settings import get_active_search_settings_list
from onyx.db.search_settings import get_current_search_settings
from onyx.db.sync_record import fetch_paginated_sync_records
from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_pool import get_redis_client
from onyx.server.documents.models import CCPairFullInfo
Expand All @@ -60,6 +63,7 @@
from onyx.server.documents.models import DocumentSyncStatus
from onyx.server.documents.models import IndexAttemptSnapshot
from onyx.server.documents.models import PaginatedReturn
from onyx.server.documents.models import SyncRecordSnapshot
from onyx.server.models import StatusResponse
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import fetch_ee_implementation_or_noop
Expand All @@ -69,6 +73,85 @@
router = APIRouter(prefix="/manage")


@router.get("/admin/user-group/{group_id}/sync-status")
def get_user_group_sync_status(
group_id: int,
page_num: int = Query(0, ge=0),
page_size: int = Query(10, ge=1, le=1000),
user: User | None = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
) -> PaginatedReturn[SyncRecordSnapshot]:
user_group = fetch_user_group(db_session, group_id)
if not user_group:
raise HTTPException(status_code=404, detail="User group not found")

sync_records, total_count = fetch_paginated_sync_records(
db_session, group_id, SyncType.USER_GROUP, page_num, page_size
)

return PaginatedReturn(
items=[
SyncRecordSnapshot.from_sync_record_db_model(sync_record)
for sync_record in sync_records
],
total_items=total_count,
)


@router.post("/admin/user-group/{group_id}/sync")
def sync_user_group(
group_id: int,
user: User | None = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
) -> StatusResponse[None]:
"""Triggers sync of a user group immediately"""
get_current_tenant_id()

user_group = fetch_user_group(db_session, group_id)
if not user_group:
raise HTTPException(status_code=404, detail="User group not found")

# Add logic to actually trigger the sync - this would depend on your implementation
# For example:
# try_creating_usergroup_sync_task(primary_app, group_id, get_redis_client(), tenant_id)

logger.info(f"User group sync queued: group_id={group_id}")

return StatusResponse(
success=True,
message="Successfully created the user group sync task.",
)


@router.get("/admin/cc-pair/{cc_pair_id}/sync-status")
def get_cc_pair_sync_status(
cc_pair_id: int,
page_num: int = Query(0, ge=0),
page_size: int = Query(10, ge=1, le=1000),
user: User | None = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
) -> PaginatedReturn[SyncRecordSnapshot]:
cc_pair = get_connector_credential_pair_from_id_for_user(
cc_pair_id, db_session, user, get_editable=False
)
if not cc_pair:
raise HTTPException(
status_code=400, detail="CC Pair not found for current user permissions"
)

sync_records, total_count = fetch_paginated_sync_records(
db_session, cc_pair_id, SyncType.EXTERNAL_PERMISSIONS, page_num, page_size
)

return PaginatedReturn(
items=[
SyncRecordSnapshot.from_sync_record_db_model(sync_record)
for sync_record in sync_records
],
total_items=total_count,
)


@router.get("/admin/cc-pair/{cc_pair_id}/index-attempts")
def get_cc_pair_index_attempts(
cc_pair_id: int,
Expand Down
24 changes: 24 additions & 0 deletions backend/onyx/server/documents/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
from onyx.connectors.models import InputType
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import SyncStatus
from onyx.db.enums import SyncType
from onyx.db.models import Connector
from onyx.db.models import ConnectorCredentialPair
from onyx.db.models import Credential
from onyx.db.models import Document as DbDocument
from onyx.db.models import IndexAttempt
from onyx.db.models import IndexingStatus
from onyx.db.models import SyncRecord
from onyx.db.models import TaskStatus
from onyx.server.models import FullUserSnapshot
from onyx.server.models import InvitedUserSnapshot
Expand Down Expand Up @@ -67,6 +70,26 @@ class ConnectorBase(BaseModel):
indexing_start: datetime | None = None


class SyncRecordSnapshot(BaseModel):
id: int
entity_id: int
sync_type: SyncType
created_at: datetime
num_docs_synced: int
sync_status: SyncStatus

@classmethod
def from_sync_record_db_model(cls, sync_record: SyncRecord) -> "SyncRecordSnapshot":
return cls(
id=sync_record.id,
entity_id=sync_record.entity_id,
sync_type=sync_record.sync_type,
created_at=sync_record.sync_start_time,
num_docs_synced=sync_record.num_docs_synced,
sync_status=sync_record.sync_status,
)


class ConnectorUpdateRequest(ConnectorBase):
access_type: AccessType
groups: list[int] = Field(default_factory=list)
Expand Down Expand Up @@ -194,6 +217,7 @@ def from_index_attempt_db_model(
InvitedUserSnapshot,
ChatSessionMinimal,
IndexAttemptErrorPydantic,
SyncRecordSnapshot,
)


Expand Down
Loading
Loading