-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkflow_main.py
More file actions
145 lines (119 loc) · 5.57 KB
/
workflow_main.py
File metadata and controls
145 lines (119 loc) · 5.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import logging
from pathlib import Path
from prefect import flow, get_run_logger
from prefect.context import get_run_context
from prefect.exceptions import MissingContextError
from helper import config as config_helper
from helper.config import CONFIG_PATH, check_for_config, get_local_state_db_path, \
setup_prefect_logging
from helper.constants import DOCUMENT_TYPE_CRAN
from helper.logger import get_logger_safe
from helper.planner_tools import get_plan_from_lakefs, get_cran_items_having_doc_pdf, convert_worker_plan_to_list
from tasks.state_pull import pull_state_db_from_lakefs
from tasks.init_db_task import init_db_task, get_connection
from tasks.state_push import snapshot_table_counts
from tasks.update_software_items import update_software_item_index_from_mardi
from tasks.update_lakefs_file_index import update_file_index_from_lakefs
from tasks.update_embeddings import update_embeddings, count_software_items_with_pdf_component
from tasks.state_push import push_state_db_to_lakefs
EXEC_MODE_USE_STATEDB = "EXEC_MODE_USE_STATEDB"
EXEC_MODE_USE_LOCAL_PLAN = "EXEC_MODE_USE_LOCAL_PLAN"
setup_prefect_logging()
@flow(name="start_update_embedding_workflow")
def start_update_embedding_workflow(
update_embeddings_loop_iterations: int = 2,
update_embeddings_embeddings_per_loop: int = 10,
timeout_seconds: int = 100,
max_pages: int = 100,
worker_plan_name: str | None = None,
):
"""
Orchestrate the end-to-end software documentation embedding sync flow.
Args:
update_embeddings_loop_iterations: Number of iterations to run the embedding loop.
update_embeddings_embeddings_per_loop: Number of PDFs processed per iteration.
timeout_seconds: Chunking/embedding timeout per PDF.
max_pages: Maximum pages allowed per PDF before skipping.
worker_plan_name: Optional plan filename (e.g., "plan_localworker_01") looked up under
the LakeFS planned/ prefix. If provided and not found, the workflow exits
with an error.
"""
logger = get_logger_safe()
if worker_plan_name:
EXEC_MODE = EXEC_MODE_USE_LOCAL_PLAN
else:
EXEC_MODE = EXEC_MODE_USE_STATEDB
logger.info(f"Running with: iterations={update_embeddings_loop_iterations}, "
f"per_loop={update_embeddings_embeddings_per_loop}")
state_db_path: str = str(get_local_state_db_path())
Path(state_db_path).parent.mkdir(parents=True, exist_ok=True)
# Check whether a plan exists for this run
# A plan contains the files that should be embedded without the need
# to use the state database - this allows true parallel execution.
if EXEC_MODE == EXEC_MODE_USE_LOCAL_PLAN:
worker_plan = get_plan_from_lakefs(worker_plan_name)
if not worker_plan:
logger.error(f"Worker plan {worker_plan_name} not found. Exiting.")
SystemExit(1)
cran_items_having_doc_pdf = convert_worker_plan_to_list(worker_plan)
# Initialize "normal" behaviour, based on lakeFS state database
if EXEC_MODE == EXEC_MODE_USE_STATEDB:
pulled = pull_state_db_from_lakefs()
if not pulled:
init_db_task()
baseline_counts = snapshot_table_counts()
update_software_item_index_from_mardi()
update_file_index_from_lakefs()
software_items_with_pdf_component_count = count_software_items_with_pdf_component.fn()
logger.info(f"QIDs with components: {software_items_with_pdf_component_count}")
# Get items that exist in KG and have a documentation pdf in lakefs
cran_items_having_doc_pdf = get_cran_items_having_doc_pdf()
total_components = len(cran_items_having_doc_pdf)
logger.info(
f"Found {total_components:,} component records; {total_components:,} pending embeddings"
)
# Start actual workflow tasks - OUTER LOOP
for iteration in range(update_embeddings_loop_iterations):
# Split into batches for this iteration - INNER LOOP
batches = [
cran_items_having_doc_pdf[i:i + update_embeddings_embeddings_per_loop]
for i in range(0, len(cran_items_having_doc_pdf), update_embeddings_embeddings_per_loop)
]
for batch in batches:
if not batch:
continue
update_embeddings(
document_type=DOCUMENT_TYPE_CRAN,
timeout_seconds=timeout_seconds,
max_pages=max_pages,
cran_items_having_doc_pdf=batch,
)
# Only push new state db if in this exec mode
if EXEC_MODE == EXEC_MODE_USE_STATEDB:
push_state_db_to_lakefs(baseline_counts=baseline_counts)
completed = iteration + 1
remaining = update_embeddings_loop_iterations - completed
logger.info(
"Completed iteration %s/%s; %s remaining",
completed,
update_embeddings_loop_iterations,
remaining,
)
if __name__ == "__main__":
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
if not logger.handlers:
logger.addHandler(logging.StreamHandler())
# Check whether we are running in a Prefect environment
try:
get_run_context()
except MissingContextError:
config_helper.is_prefect_environment = False
logger.info("Prefect environment not detected; running without Prefect context.")
else:
logger.info("Prefect environment detected; Prefect context available.")
if check_for_config():
start_update_embedding_workflow()
else:
raise SystemExit(1)
# https://github.com/shanojpillai/qdrant-rag-pro